diff --git a/src/cluster.jl b/src/cluster.jl index 0e8df8c..0b42961 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -163,10 +163,10 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - t = Threads.@spawn exec_conn_func(w) + t = Threads.@spawn Threads.threadpool() exec_conn_func(w) else # route request via node 1 - t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end errormonitor(t) wait_for_conn(w) @@ -194,7 +194,7 @@ function wait_for_conn(w) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - T = Threads.@spawn begin + T = Threads.@spawn Threads.threadpool() begin sleep($timeout) lock(w.c_state) do notify(w.c_state; all=true) @@ -329,7 +329,7 @@ function read_worker_host_port(io::IO) leader = String[] try while ntries > 0 - readtask = Threads.@spawn readline(io) + readtask = Threads.@spawn Threads.threadpool() readline(io) yield() while !istaskdone(readtask) && ((time_ns() - t0) < timeout) sleep(0.05) @@ -496,13 +496,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...) # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online - t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy) + t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy) @sync begin while true if isempty(launched) istaskdone(t_launch) && break - Threads.@spawn begin + Threads.@spawn Threads.threadpool() begin sleep(1) notify(launch_ntfy) end @@ -512,7 +512,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...) if !isempty(launched) wconfig = popfirst!(launched) let wconfig=wconfig - Threads.@spawn setup_launched_worker(manager, wconfig, launched_q) + Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q) end end end @@ -592,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch wconfig.port = port let wconfig=wconfig - Threads.@spawn begin + Threads.@spawn Threads.threadpool() begin pid = create_worker(manager, wconfig) remote_do(redirect_output_from_additional_worker, frompid, pid, port) push!(launched_q, pid) @@ -1050,13 +1050,13 @@ function rmprocs(pids...; waitfor=typemax(Int)) pids = vcat(pids...) if waitfor == 0 - t = Threads.@spawn _rmprocs(pids, typemax(Int)) + t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. - return Threads.@spawn nothing + return Threads.@spawn Threads.threadpool() nothing end end @@ -1239,7 +1239,7 @@ function interrupt(pids::AbstractVector=workers()) @assert myid() == 1 @sync begin for pid in pids - Threads.@spawn interrupt(pid) + Threads.@spawn Threads.threadpool() interrupt(pid) end end end diff --git a/src/macros.jl b/src/macros.jl index ac5029e..5f3ce1e 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex) # execute locally last as we do not want local execution to block serialization # of the request to remote nodes. for _ in 1:run_locally - Threads.@spawn Core.eval(m, ex) + Threads.@spawn Threads.threadpool() Core.eval(m, ex) end end nothing @@ -275,7 +275,7 @@ function preduce(reducer, f, R) end function pfor(f, R) - t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) + t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end errormonitor(t) diff --git a/src/managers.jl b/src/managers.jl index b506c42..cf48c6f 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - Threads.@spawn try + Threads.@spawn Threads.threadpool() try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") @@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) - timer_task = Threads.@spawn begin + timer_task = Threads.@spawn Threads.threadpool() begin sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal diff --git a/src/remotecall.jl b/src/remotecall.jl index 3cd1207..eda3899 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -322,7 +322,7 @@ function process_worker(rr) msg = (remoteref_id(rr), myid()) # Needs to acquire a lock on the del_msg queue - T = Threads.@spawn begin + T = Threads.@spawn Threads.threadpool() begin publish_del_msg!($w, $msg) end Base.errormonitor(T)