Skip to content

Commit

Permalink
run: exit gracefully when an error happens
Browse files Browse the repository at this point in the history
When an error happened during "baur run" execution, the command terminated
immediately.
It did not wait for successful executed task to finish uploading their artifacts
and recording the result in the database.
This means results are lost and tasks have to be rerun unnecessarily in these
cases.

The command is changed to skip all queued task executions and wait for all
uploads and db record operations to finish when an error happens instead.
Task executions that are already running when an error happens finish and are
not canceled

When a task run is skipped because another operation failed, it is printed with
a "skipped" status to the console.
  • Loading branch information
fho committed Jun 28, 2021
1 parent 49ff93d commit 8e59c71
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 35 deletions.
146 changes: 112 additions & 34 deletions internal/command/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package command

import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"time"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -64,6 +66,12 @@ The following Environment Variables are supported:
term.Highlight("DOCKER_CERT_PATH"),
term.Highlight("DOCKER_TLS_VERIFY"))

var (
statusStrSuccess = term.GreenHighlight("successful")
statusStrSkipped = term.YellowHighlight("skipped")
statusStrFailed = term.RedHighlight("failed")
)

func init() {
rootCmd.AddCommand(&newRunCmd().Command)
}
Expand All @@ -72,12 +80,12 @@ type runCmd struct {
cobra.Command

// Cmdline parameters
skipUpload bool
force bool
inputStr []string
lookupInputStr string
taskRunners uint
showOutput bool
skipUpload bool
force bool
inputStr []string
lookupInputStr string
taskRunnerGoRoutines uint
showOutput bool

// other fields
storage storage.Storer
Expand All @@ -88,6 +96,10 @@ type runCmd struct {

uploadRoutinePool *routines.Pool
taskRunnerRoutinePool *routines.Pool
taskRunner *baur.TaskRunner

terminateGracefullyOnce sync.Once
errorHappened bool
}

func newRunCmd() *runCmd {
Expand All @@ -110,7 +122,7 @@ func newRunCmd() *runCmd {
"include a string as input, can be specified multiple times")
cmd.Flags().StringVar(&cmd.lookupInputStr, "lookup-input-str", "",
"if a run can not be found, try to find a run with this value as input-string")
cmd.Flags().UintVarP(&cmd.taskRunners, "parallel-runs", "p", 1,
cmd.Flags().UintVarP(&cmd.taskRunnerGoRoutines, "parallel-runs", "p", 1,
"specifies the max. number of tasks to run in parallel")
cmd.Flags().BoolVarP(&cmd.showOutput, "show-task-output", "o", false,
"show the output of tasks, if disabled the output is only shown "+
Expand All @@ -123,7 +135,7 @@ func newRunCmd() *runCmd {
func (c *runCmd) run(cmd *cobra.Command, args []string) {
var err error

if c.taskRunners == 0 {
if c.taskRunnerGoRoutines == 0 {
stderr.Printf("--parallel-runs must be greater than 0\n")
exitFunc(1)
}
Expand All @@ -137,7 +149,8 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) {
defer c.storage.Close()

c.uploadRoutinePool = routines.NewPool(1) // run 1 upload in parallel with builds
c.taskRunnerRoutinePool = routines.NewPool(c.taskRunners)
c.taskRunnerRoutinePool = routines.NewPool(c.taskRunnerGoRoutines)
c.taskRunner = baur.NewTaskRunner()

c.dockerClient, err = docker.NewClient(log.StdLogger.Debugf)
exitOnErr(err)
Expand Down Expand Up @@ -184,77 +197,120 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) {
ptCopy := pt
c.taskRunnerRoutinePool.Queue(func() {
task := ptCopy.task
runResult := c.runTask(task)
runResult, err := c.runTask(task)
if err != nil {
// error is printed in runTask()
c.terminateGracefully()
return
}

outputs, err := baur.OutputsFromTask(c.dockerClient, task)
exitOnErrf(err, task.ID())
if err != nil {
stderr.ErrPrintln(err, task.ID())
c.terminateGracefully()
return
}

if !outputsExist(task, outputs) {
exitFunc(1)
// error is printed in runTask()
c.terminateGracefully()
return
}

if c.skipUpload {
return
}

c.uploadRoutinePool.Queue(func() {
c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult)
err := c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult)
if err != nil {
// error is printed in runTask()
c.terminateGracefully()
}
})
})
}

c.taskRunnerRoutinePool.Wait()

stdout.Println("all tasks executed, waiting for uploads to finish...")
stdout.Println("task execution finished, waiting for uploads to finish...")
c.uploadRoutinePool.Wait()
stdout.PrintSep()
stdout.Printf("finished in: %s\n",
term.FormatDuration(
time.Since(startTime),
),
)

if c.errorHappened {
exitFunc(1)
}
}

func (c *runCmd) runTask(task *baur.Task) *baur.RunResult {
result, err := baur.NewTaskRunner().Run(task)
exitOnErrf(err, "%s", task.ID())
func (c *runCmd) terminateGracefully() {
c.terminateGracefullyOnce.Do(func() {
c.taskRunner.SkipRuns(true)

if result.Result.ExitCode != 0 {
statusStr := term.RedHighlight("failed")
c.errorHappened = true

stderr.Printf("%s, %s execution of queued task runs\n",
term.RedHighlight("terminating"),
term.YellowHighlight("skipping"),
)
})
}

func (c *runCmd) runTask(task *baur.Task) (*baur.RunResult, error) {
result, err := c.taskRunner.Run(task)
if err != nil {
if errors.Is(err, baur.ErrTaskRunSkipped) {
stderr.Printf("%s: execution %s\n",
term.Highlight(task),
statusStrSkipped,
)
return nil, err
}

stderr.Printf("%s: execution %s, error: %s\n",
term.Highlight(task),
statusStrFailed,
err,
)
return nil, err
}

if result.Result.ExitCode != 0 {
if c.showOutput {
stderr.Printf("%s: execution %s (%s), command exited with code %d\n",
task,
statusStr,
term.Highlight(task),
statusStrFailed,
term.FormatDuration(
result.StopTime.Sub(result.StartTime),
),
result.ExitCode,
)
} else {
stderr.Printf("%s: execution %s (%s), command exited with code %d, output:\n%s\n",
task,
statusStr,
term.Highlight(task),
statusStrFailed,
term.FormatDuration(
result.StopTime.Sub(result.StartTime),
),
result.ExitCode,
result.StrOutput())
}

exitFunc(1)
return nil, fmt.Errorf("execution failed with exit code %d", result.ExitCode)
}

statusStr := term.GreenHighlight("successful")

stdout.TaskPrintf(task, "execution %s (%s)\n",
statusStr,
statusStrSuccess,
term.FormatDuration(
result.StopTime.Sub(result.StartTime),
),
)

return result
return result, nil
}

type pendingTask struct {
Expand All @@ -268,7 +324,7 @@ func (c *runCmd) uploadAndRecord(
inputs *baur.Inputs,
outputs []baur.Output,
runResult *baur.RunResult,
) {
) error {
var uploadResults []*baur.UploadResult

for _, output := range outputs {
Expand All @@ -280,7 +336,10 @@ func (c *runCmd) uploadAndRecord(
},
func(o baur.Output, result *baur.UploadResult) {
size, err := o.SizeBytes()
exitOnErrf(err, "%s: %s:", task.ID(), output)
if err != nil {
stderr.ErrPrintf(err, "%s: %s", task, output)
c.terminateGracefully()
}

bps := uint64(math.Round(float64(size) / result.Stop.Sub(result.Start).Seconds()))

Expand All @@ -292,14 +351,30 @@ func (c *runCmd) uploadAndRecord(
uploadResults = append(uploadResults, result)
},
)

exitOnErrf(err, "%s: %s", task.ID(), output)
if err != nil {
stderr.Printf("%s: %s: upload %s, %s\n",
term.Highlight(task),
output,
statusStrFailed,
err,
)
return err
}
}

id, err := baur.StoreRun(ctx, c.storage, c.vcsState, task, inputs, runResult, uploadResults)
exitOnErrf(err, "%s", task.ID())
if err != nil {
stderr.Printf("%s: recording build result in database %s, %s\n",
term.Highlight(task),
statusStrFailed,
err,
)
return err
}

stdout.TaskPrintf(task, "run stored in database with ID %s\n", term.Highlight(id))

return nil
}

func outputsExist(task *baur.Task, outputs []baur.Output) bool {
Expand All @@ -312,7 +387,10 @@ func outputsExist(task *baur.Task, outputs []baur.Output) bool {

for _, output := range outputs {
exists, err := output.Exists()
exitOnErrf(err, "%s:", task.ID())
if err != nil {
stderr.ErrPrintf(err, task.ID())
return false
}

if exists {
size, err := output.SizeBytes()
Expand Down
31 changes: 30 additions & 1 deletion pkg/baur/taskrunner.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package baur

import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/fatih/color"

"github.com/simplesurance/baur/v2/internal/exec"
)

// ErrTaskRunSkipped is returned when a task run was skipped instead of executed.
var ErrTaskRunSkipped = errors.New("task run skipped")

// TaskRunner executes the command of a task.
type TaskRunner struct{}
type TaskRunner struct {
skipEnabled uint32 // must be accessed via atomic operations
}

func NewTaskRunner() *TaskRunner {
return &TaskRunner{}
Expand All @@ -28,6 +35,10 @@ type RunResult struct {
func (t *TaskRunner) Run(task *Task) (*RunResult, error) {
startTime := time.Now()

if t.SkipRunsIsEnabled() {
return nil, ErrTaskRunSkipped
}

// TODO: rework exec, stream the output instead of storing all in memory
execResult, err := exec.Command(task.Command[0], task.Command[1:]...).
Directory(task.Directory).
Expand All @@ -43,3 +54,21 @@ func (t *TaskRunner) Run(task *Task) (*RunResult, error) {
StopTime: time.Now(),
}, nil
}

func (t *TaskRunner) setSkipRuns(val uint32) {
atomic.StoreUint32(&t.skipEnabled, val)
}

// SkipRuns can be enabled to skip all further executions of tasks by Run().
func (t *TaskRunner) SkipRuns(enabled bool) {
if enabled {
t.setSkipRuns(1)
} else {
t.setSkipRuns(0)
}
}

// SkipRunsIsEnabled returns true if SkipRuns is enabled.
func (t *TaskRunner) SkipRunsIsEnabled() bool {
return atomic.LoadUint32(&t.skipEnabled) == 1
}

0 comments on commit 8e59c71

Please sign in to comment.