diff --git a/src/cluster.jl b/src/cluster.jl index 82c77c8..0ac9eb4 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -99,7 +99,7 @@ mutable struct Worker del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels? add_msgs::Array{Any,1} @atomic gcflag::Bool - state::WorkerState + @atomic state::WorkerState c_state::Threads.Condition # wait for state changes, lock for state ct_time::Float64 # creation time conn_func::Any # used to setup connections lazily @@ -145,15 +145,13 @@ end function set_worker_state(w, state) lock(w.c_state) do - w.state = state + @atomic w.state = state notify(w.c_state; all=true) end end function check_worker_state(w::Worker) - lock(w.c_state) if w.state === W_CREATED - unlock(w.c_state) if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker @@ -173,9 +171,8 @@ function check_worker_state(w::Worker) errormonitor(t) wait_for_conn(w) end - else - unlock(w.c_state) end + return nothing end exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)::Worker) @@ -193,9 +190,7 @@ function exec_conn_func(w::Worker) end function wait_for_conn(w) - lock(w.c_state) if w.state === W_CREATED - unlock(w.c_state) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") @@ -210,8 +205,6 @@ function wait_for_conn(w) wait(w.c_state) w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end - else - unlock(w.c_state) end nothing end @@ -667,8 +660,8 @@ function create_worker(manager, wconfig) for jw in PGRP.workers if (jw.id != 1) && (jw.id < w.id) # wait for wl to join - lock(jw.c_state) do - if jw.state === W_CREATED + if jw.state === W_CREATED + lock(jw.c_state) do wait(jw.c_state) end end