Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safety improvements. Add tests with multiple threads. CI improvements. #122

Merged
merged 16 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@ concurrency:

jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
name: julia -t${{ matrix.threads}} - ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
version:
- 'nightly'
os:
- ubuntu-latest
- macOS-latest
- windows-latest
arch:
- x64
- x86
threads:
# - '1'
- '4,4'
version: [nightly]
os: [ubuntu-latest, windows-latest, macOS-latest]
arch: [x64, x86, aarch64]
exclude:
- os: ubuntu-latest
arch: aarch64
- os: windows-latest
arch: aarch64
- os: macOS-latest
arch: x64
- os: macOS-latest
arch: x86
steps:
Expand All @@ -44,6 +48,7 @@ jobs:
- uses: julia-actions/julia-runtest@v1
env:
JULIA_DISTRIBUTED_TESTING_STANDALONE: 1
JULIA_NUM_THREADS: '${{ matrix.threads}}'
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v5
with:
Expand Down
64 changes: 30 additions & 34 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
end

function check_worker_state(w::Worker)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
if !isclusterlazy()
if PGRP.topology === :all_to_all
# Since higher pids connect with lower pids, the remote worker
Expand All @@ -163,10 +163,10 @@
else
w.ct_time = time()
if myid() > w.id
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
t = @async exec_conn_func(w)
else
# route request via node 1
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
Expand All @@ -190,20 +190,14 @@
end

function wait_for_conn(w)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

T = Threads.@spawn Threads.threadpool() begin
sleep($timeout)
lock(w.c_state) do
notify(w.c_state; all=true)
end
end
errormonitor(T)
lock(w.c_state) do
wait(w.c_state)
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
if timedwait(() -> (@atomic w.state) === W_CONNECTED, timeout) === :timed_out
# Notify any waiters on the state and throw
@lock w.c_state notify(w.c_state)
error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")

Check warning on line 200 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L199-L200

Added lines #L199 - L200 were not covered by tests
end
end
nothing
Expand Down Expand Up @@ -258,7 +252,7 @@
else
sock = listen(interface, LPROC.bind_port)
end
errormonitor(Threads.@spawn while isopen(sock)
errormonitor(@async while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end)
Expand Down Expand Up @@ -290,7 +284,7 @@


function redirect_worker_output(ident, stream)
t = Threads.@spawn while !eof(stream)
t = @async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand Down Expand Up @@ -329,7 +323,7 @@
leader = String[]
try
while ntries > 0
readtask = Threads.@spawn Threads.threadpool() readline(io)
readtask = @async readline(io)
yield()
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
sleep(0.05)
Expand Down Expand Up @@ -430,7 +424,7 @@

```julia
# On busy clusters, call `addprocs` asynchronously
t = Threads.@spawn addprocs(...)
t = @async addprocs(...)
```

```julia
Expand Down Expand Up @@ -496,13 +490,14 @@
# call manager's `launch` is a separate task. This allows the master
# process initiate the connection setup process as and when workers come
# online
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)
# NOTE: Must be `@async`. See FIXME above
t_launch = @async launch(manager, params, launched, launch_ntfy)

@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
Threads.@spawn Threads.threadpool() begin
@async begin # NOTE: Must be `@async`. See FIXME above
sleep(1)
notify(launch_ntfy)
end
Expand All @@ -512,7 +507,8 @@
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
# NOTE: Must be `@async`. See FIXME above
@async setup_launched_worker(manager, wconfig, launched_q)
end
end
end
Expand Down Expand Up @@ -592,7 +588,7 @@
wconfig.port = port

let wconfig=wconfig
Threads.@spawn Threads.threadpool() begin
@async begin

Check warning on line 591 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L591

Added line #L591 was not covered by tests
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
Expand Down Expand Up @@ -660,7 +656,7 @@
for jw in PGRP.workers
if (jw.id != 1) && (jw.id < w.id)
# wait for wl to join
if jw.state === W_CREATED
if (@atomic jw.state) === W_CREATED
lock(jw.c_state) do
wait(jw.c_state)
end
Expand Down Expand Up @@ -688,7 +684,7 @@

for wl in wlist
lock(wl.c_state) do
if wl.state === W_CREATED
if (@atomic wl.state) === W_CREATED
# wait for wl to join
wait(wl.c_state)
end
Expand Down Expand Up @@ -758,7 +754,7 @@
end

errormonitor(
Threads.@spawn begin
@async begin
timeout = worker_timeout()
if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out
print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n")
Expand Down Expand Up @@ -890,7 +886,7 @@
n = length(PGRP.workers)
# filter out workers in the process of being setup/shutdown.
for jw in PGRP.workers
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
n = n - 1
end
end
Expand Down Expand Up @@ -941,7 +937,7 @@
function procs()
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
# filter out workers in the process of being setup/shutdown.
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
else
return Int[x.id for x in PGRP.workers]
end
Expand All @@ -950,7 +946,7 @@
function id_in_procs(id) # faster version of `id in procs()`
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
for x in PGRP.workers
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
return true
end
end
Expand All @@ -972,7 +968,7 @@
"""
function procs(pid::Integer)
if myid() == 1
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]

Check warning on line 971 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L971

Added line #L971 was not covered by tests
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
else
Expand Down Expand Up @@ -1050,13 +1046,13 @@

pids = vcat(pids...)
if waitfor == 0
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))
t = @async _rmprocs(pids, typemax(Int))

Check warning on line 1049 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L1049

Added line #L1049 was not covered by tests
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return Threads.@spawn Threads.threadpool() nothing
return @async nothing
end
end

Expand All @@ -1079,11 +1075,11 @@

start = time_ns()
while (time_ns() - start) < waitfor*1e9
all(w -> w.state === W_TERMINATED, rmprocset) && break
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
end

unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
if length(unremoved) > 0
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
throw(ErrorException(estr))
Expand Down Expand Up @@ -1239,7 +1235,7 @@
@assert myid() == 1
@sync begin
for pid in pids
Threads.@spawn Threads.threadpool() interrupt(pid)
@async interrupt(pid)

Check warning on line 1238 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L1238

Added line #L1238 was not covered by tests
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
# execute locally last as we do not want local execution to block serialization
# of the request to remote nodes.
for _ in 1:run_locally
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
@async Core.eval(m, ex)
end
end
nothing
Expand Down Expand Up @@ -275,7 +275,7 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
@spawnat :any f(R, first(c), last(c))
end
errormonitor(t)
Expand Down
18 changes: 13 additions & 5 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@

* `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String`
holding one flag, or a collection of strings, with one element per flag.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.

* `topology`: Specifies how the workers connect to each other. Sending a message between
unconnected workers results in an error.
Expand Down Expand Up @@ -178,7 +178,7 @@
# Wait for all launches to complete.
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
Threads.@spawn Threads.threadpool() try
@async try

Check warning on line 181 in src/managers.jl

View check run for this annotation

Codecov / codecov/patch

src/managers.jl#L181

Added line #L181 was not covered by tests
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
Expand Down Expand Up @@ -740,16 +740,24 @@
nothing
end

function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15)
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15)
# profile_wait = 6 is 1s for profile, 5s for the report to show
# First, try sending `exit()` to the remote over the usual control channels
remote_do(exit, pid)

timer_task = Threads.@spawn Threads.threadpool() begin
timer_task = @async begin
sleep(exit_timeout)

# Check to see if our child exited, and if not, send an actual kill signal
if !process_exited(config.process)
@warn("Failed to gracefully kill worker $(pid), sending SIGQUIT")
@warn "Failed to gracefully kill worker $(pid)"
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
if profile_sig !== nothing
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
kill(config.process, profile_sig[2])
sleep(profile_wait)
end
@warn("Sending SIGQUIT to worker $(pid)")
kill(config.process, Base.SIGQUIT)

sleep(term_timeout)
Expand Down
4 changes: 2 additions & 2 deletions src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@
function flush_gc_msgs()
try
for w in (PGRP::ProcessGroup).workers
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
flush_gc_msgs(w)
end
end
catch e
bt = catch_backtrace()
Threads.@spawn showerror(stderr, e, bt)
@async showerror(stderr, e, bt)

Check warning on line 203 in src/messages.jl

View check run for this annotation

Codecov / codecov/patch

src/messages.jl#L203

Added line #L203 was not covered by tests
end
end

Expand Down
Loading
Loading