Skip to content

Commit

Permalink
runner: improve semantics of signal propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
ucirello committed Aug 18, 2024
1 parent 256c5bd commit 1255940
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
13 changes: 7 additions & 6 deletions internal/runner/cmd_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (

func commandContext(cmd string) (*exec.Cmd, func() error) {
c := exec.Command("sh", "-c", cmd)
c.WaitDelay = 1 * time.Minute
c.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
return c, func() error {
if err := syscall.Kill(-c.Process.Pid, syscall.SIGKILL); err != nil {
return err
}
if err := c.Process.Kill(); err != nil {
return err
if c.Process == nil {
return nil
}
pgid := -c.Process.Pid
_ = syscall.Kill(pgid, syscall.SIGINT)
time.AfterFunc(5*time.Second, func() {
_ = syscall.Kill(pgid, syscall.SIGKILL)
})
return nil
}
}
15 changes: 4 additions & 11 deletions internal/runner/cmd_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@
package runner

import (
"os"
"context"
"os/exec"
)

func commandContext(cmd string) (*exec.Cmd, func() error) {
c := exec.Command("cmd", "/c", cmd)
ctx, cancel := context.WithCancel(context.Background())
c := exec.CommandContext(ctx, "cmd", "/c", cmd)
return c, func() error {
if c.Process == nil {
return nil
}
if err := c.Process.Signal(os.Interrupt); err != nil {
return err
}
if err := c.Process.Kill(); err != nil {
return err
}
cancel()
return nil
}
}
27 changes: 22 additions & 5 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,16 @@ func (r *Runner) Start(rootCtx context.Context) error {
return err
}

run := make(chan string)
run := make(chan string, 1)
fileHashes := make(map[string]string) // fn to hash
c, cancel := context.WithCancel(rootCtx)
done := make(chan struct{})
close(done)
for {
select {
case <-rootCtx.Done():
cancel()
<-done
return nil
case fn := <-updates:
newHash := calcFileHash(fn)
Expand All @@ -257,13 +260,20 @@ func (r *Runner) Start(rootCtx context.Context) error {

if l := len(updates); l == 0 {
cancel()
go func() { run <- fn }()
select {
case run <- fn:
default:
}
} else {
log.Println("builds pending before application start:", l)
}
case fn := <-run:
c, cancel = context.WithCancel(rootCtx)
go r.runNonBuilds(rootCtx, c, fn)
done = make(chan struct{})
go func() {
defer close(done)
r.runNonBuilds(rootCtx, c, fn)
}()
}
}
}
Expand Down Expand Up @@ -331,6 +341,7 @@ func (r *Runner) runNonBuilds(rootCtx, ctx context.Context, changedFileName stri
groups := make(map[string]context.Context)
ready := make(chan struct{})

var wg sync.WaitGroup
for j, sv := range r.Processes {
if strings.HasPrefix(sv.Name, "build") {
continue
Expand All @@ -357,15 +368,19 @@ func (r *Runner) runNonBuilds(rootCtx, ctx context.Context, changedFileName stri

if sv.Restart == Loop && r.currentGeneration == 0 {
loopSvcCtx := oversight.WithContext(rootCtx)
wg.Add(1)
oversight.Add(loopSvcCtx, func(ctx context.Context) error {
defer wg.Done()
<-ready
r.startProcess(ctx, sv, i, pc, changedFileName, io.Discard)
return nil
}, oversight.RestartWith(oversight.Permanent()))
portCount++
} else if sv.Restart == Temporary && r.currentGeneration == 0 {
temporarySvcCtx := oversight.WithContext(rootCtx)
wg.Add(1)
oversight.Add(temporarySvcCtx, func(ctx context.Context) error {
defer wg.Done()
<-ready
r.startProcess(ctx, sv, i, pc, changedFileName, io.Discard)
return nil
Expand All @@ -382,7 +397,9 @@ func (r *Runner) runNonBuilds(rootCtx, ctx context.Context, changedFileName stri
case OnFailure:
restart = oversight.Transient()
}
wg.Add(1)
oversight.Add(procCtx, func(ctx context.Context) error {
defer wg.Done()
<-ready
ok := r.startProcess(ctx, sv, i, pc, changedFileName, io.Discard)
if !ok && sv.Restart == OnFailure {
Expand All @@ -400,8 +417,8 @@ func (r *Runner) runNonBuilds(rootCtx, ctx context.Context, changedFileName stri
}
r.currentGeneration++
close(ready)

<-ctx.Done()
wg.Wait()
}

func discoveryEnvVar(name string, procCount int) string {
Expand Down Expand Up @@ -489,9 +506,9 @@ func (r *Runner) startProcess(ctx context.Context, sv *ProcessType, procCount, p
go func() {
select {
case <-ctx.Done():
stopCmd()
case <-done:
}
stopCmd()
}()
defer close(done)
if err := c.Run(); err != nil {
Expand Down

0 comments on commit 1255940

Please sign in to comment.