From 8e59c71019b1728ca6fd4e791158da1cbf3ba675 Mon Sep 17 00:00:00 2001 From: Fabian Holler Date: Mon, 28 Jun 2021 10:51:12 +0200 Subject: [PATCH] run: exit gracefully when an error happens When an error happened during "baur run" execution, the command terminated immediately. It did not wait for successful executed task to finish uploading their artifacts and recording the result in the database. This means results are lost and tasks have to be rerun unnecessarily in these cases. The command is changed to skip all queued task executions and wait for all uploads and db record operations to finish when an error happens instead. Task executions that are already running when an error happens finish and are not canceled When a task run is skipped because another operation failed, it is printed with a "skipped" status to the console. --- internal/command/run.go | 146 ++++++++++++++++++++++++++++++---------- pkg/baur/taskrunner.go | 31 ++++++++- 2 files changed, 142 insertions(+), 35 deletions(-) diff --git a/internal/command/run.go b/internal/command/run.go index 78e8f16a3..b57acc851 100644 --- a/internal/command/run.go +++ b/internal/command/run.go @@ -2,9 +2,11 @@ package command import ( "context" + "errors" "fmt" "math" "strings" + "sync" "time" "github.com/spf13/cobra" @@ -64,6 +66,12 @@ The following Environment Variables are supported: term.Highlight("DOCKER_CERT_PATH"), term.Highlight("DOCKER_TLS_VERIFY")) +var ( + statusStrSuccess = term.GreenHighlight("successful") + statusStrSkipped = term.YellowHighlight("skipped") + statusStrFailed = term.RedHighlight("failed") +) + func init() { rootCmd.AddCommand(&newRunCmd().Command) } @@ -72,12 +80,12 @@ type runCmd struct { cobra.Command // Cmdline parameters - skipUpload bool - force bool - inputStr []string - lookupInputStr string - taskRunners uint - showOutput bool + skipUpload bool + force bool + inputStr []string + lookupInputStr string + taskRunnerGoRoutines uint + showOutput bool // other fields storage storage.Storer @@ -88,6 +96,10 @@ type runCmd struct { uploadRoutinePool *routines.Pool taskRunnerRoutinePool *routines.Pool + taskRunner *baur.TaskRunner + + terminateGracefullyOnce sync.Once + errorHappened bool } func newRunCmd() *runCmd { @@ -110,7 +122,7 @@ func newRunCmd() *runCmd { "include a string as input, can be specified multiple times") cmd.Flags().StringVar(&cmd.lookupInputStr, "lookup-input-str", "", "if a run can not be found, try to find a run with this value as input-string") - cmd.Flags().UintVarP(&cmd.taskRunners, "parallel-runs", "p", 1, + cmd.Flags().UintVarP(&cmd.taskRunnerGoRoutines, "parallel-runs", "p", 1, "specifies the max. number of tasks to run in parallel") cmd.Flags().BoolVarP(&cmd.showOutput, "show-task-output", "o", false, "show the output of tasks, if disabled the output is only shown "+ @@ -123,7 +135,7 @@ func newRunCmd() *runCmd { func (c *runCmd) run(cmd *cobra.Command, args []string) { var err error - if c.taskRunners == 0 { + if c.taskRunnerGoRoutines == 0 { stderr.Printf("--parallel-runs must be greater than 0\n") exitFunc(1) } @@ -137,7 +149,8 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { defer c.storage.Close() c.uploadRoutinePool = routines.NewPool(1) // run 1 upload in parallel with builds - c.taskRunnerRoutinePool = routines.NewPool(c.taskRunners) + c.taskRunnerRoutinePool = routines.NewPool(c.taskRunnerGoRoutines) + c.taskRunner = baur.NewTaskRunner() c.dockerClient, err = docker.NewClient(log.StdLogger.Debugf) exitOnErr(err) @@ -184,13 +197,24 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { ptCopy := pt c.taskRunnerRoutinePool.Queue(func() { task := ptCopy.task - runResult := c.runTask(task) + runResult, err := c.runTask(task) + if err != nil { + // error is printed in runTask() + c.terminateGracefully() + return + } outputs, err := baur.OutputsFromTask(c.dockerClient, task) - exitOnErrf(err, task.ID()) + if err != nil { + stderr.ErrPrintln(err, task.ID()) + c.terminateGracefully() + return + } if !outputsExist(task, outputs) { - exitFunc(1) + // error is printed in runTask() + c.terminateGracefully() + return } if c.skipUpload { @@ -198,14 +222,18 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { } c.uploadRoutinePool.Queue(func() { - c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult) + err := c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult) + if err != nil { + // error is printed in runTask() + c.terminateGracefully() + } }) }) } c.taskRunnerRoutinePool.Wait() - stdout.Println("all tasks executed, waiting for uploads to finish...") + stdout.Println("task execution finished, waiting for uploads to finish...") c.uploadRoutinePool.Wait() stdout.PrintSep() stdout.Printf("finished in: %s\n", @@ -213,19 +241,49 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { time.Since(startTime), ), ) + + if c.errorHappened { + exitFunc(1) + } } -func (c *runCmd) runTask(task *baur.Task) *baur.RunResult { - result, err := baur.NewTaskRunner().Run(task) - exitOnErrf(err, "%s", task.ID()) +func (c *runCmd) terminateGracefully() { + c.terminateGracefullyOnce.Do(func() { + c.taskRunner.SkipRuns(true) - if result.Result.ExitCode != 0 { - statusStr := term.RedHighlight("failed") + c.errorHappened = true + + stderr.Printf("%s, %s execution of queued task runs\n", + term.RedHighlight("terminating"), + term.YellowHighlight("skipping"), + ) + }) +} + +func (c *runCmd) runTask(task *baur.Task) (*baur.RunResult, error) { + result, err := c.taskRunner.Run(task) + if err != nil { + if errors.Is(err, baur.ErrTaskRunSkipped) { + stderr.Printf("%s: execution %s\n", + term.Highlight(task), + statusStrSkipped, + ) + return nil, err + } + stderr.Printf("%s: execution %s, error: %s\n", + term.Highlight(task), + statusStrFailed, + err, + ) + return nil, err + } + + if result.Result.ExitCode != 0 { if c.showOutput { stderr.Printf("%s: execution %s (%s), command exited with code %d\n", - task, - statusStr, + term.Highlight(task), + statusStrFailed, term.FormatDuration( result.StopTime.Sub(result.StartTime), ), @@ -233,8 +291,8 @@ func (c *runCmd) runTask(task *baur.Task) *baur.RunResult { ) } else { stderr.Printf("%s: execution %s (%s), command exited with code %d, output:\n%s\n", - task, - statusStr, + term.Highlight(task), + statusStrFailed, term.FormatDuration( result.StopTime.Sub(result.StartTime), ), @@ -242,19 +300,17 @@ func (c *runCmd) runTask(task *baur.Task) *baur.RunResult { result.StrOutput()) } - exitFunc(1) + return nil, fmt.Errorf("execution failed with exit code %d", result.ExitCode) } - statusStr := term.GreenHighlight("successful") - stdout.TaskPrintf(task, "execution %s (%s)\n", - statusStr, + statusStrSuccess, term.FormatDuration( result.StopTime.Sub(result.StartTime), ), ) - return result + return result, nil } type pendingTask struct { @@ -268,7 +324,7 @@ func (c *runCmd) uploadAndRecord( inputs *baur.Inputs, outputs []baur.Output, runResult *baur.RunResult, -) { +) error { var uploadResults []*baur.UploadResult for _, output := range outputs { @@ -280,7 +336,10 @@ func (c *runCmd) uploadAndRecord( }, func(o baur.Output, result *baur.UploadResult) { size, err := o.SizeBytes() - exitOnErrf(err, "%s: %s:", task.ID(), output) + if err != nil { + stderr.ErrPrintf(err, "%s: %s", task, output) + c.terminateGracefully() + } bps := uint64(math.Round(float64(size) / result.Stop.Sub(result.Start).Seconds())) @@ -292,14 +351,30 @@ func (c *runCmd) uploadAndRecord( uploadResults = append(uploadResults, result) }, ) - - exitOnErrf(err, "%s: %s", task.ID(), output) + if err != nil { + stderr.Printf("%s: %s: upload %s, %s\n", + term.Highlight(task), + output, + statusStrFailed, + err, + ) + return err + } } id, err := baur.StoreRun(ctx, c.storage, c.vcsState, task, inputs, runResult, uploadResults) - exitOnErrf(err, "%s", task.ID()) + if err != nil { + stderr.Printf("%s: recording build result in database %s, %s\n", + term.Highlight(task), + statusStrFailed, + err, + ) + return err + } stdout.TaskPrintf(task, "run stored in database with ID %s\n", term.Highlight(id)) + + return nil } func outputsExist(task *baur.Task, outputs []baur.Output) bool { @@ -312,7 +387,10 @@ func outputsExist(task *baur.Task, outputs []baur.Output) bool { for _, output := range outputs { exists, err := output.Exists() - exitOnErrf(err, "%s:", task.ID()) + if err != nil { + stderr.ErrPrintf(err, task.ID()) + return false + } if exists { size, err := output.SizeBytes() diff --git a/pkg/baur/taskrunner.go b/pkg/baur/taskrunner.go index ee5860494..53c929447 100644 --- a/pkg/baur/taskrunner.go +++ b/pkg/baur/taskrunner.go @@ -1,7 +1,9 @@ package baur import ( + "errors" "fmt" + "sync/atomic" "time" "github.com/fatih/color" @@ -9,8 +11,13 @@ import ( "github.com/simplesurance/baur/v2/internal/exec" ) +// ErrTaskRunSkipped is returned when a task run was skipped instead of executed. +var ErrTaskRunSkipped = errors.New("task run skipped") + // TaskRunner executes the command of a task. -type TaskRunner struct{} +type TaskRunner struct { + skipEnabled uint32 // must be accessed via atomic operations +} func NewTaskRunner() *TaskRunner { return &TaskRunner{} @@ -28,6 +35,10 @@ type RunResult struct { func (t *TaskRunner) Run(task *Task) (*RunResult, error) { startTime := time.Now() + if t.SkipRunsIsEnabled() { + return nil, ErrTaskRunSkipped + } + // TODO: rework exec, stream the output instead of storing all in memory execResult, err := exec.Command(task.Command[0], task.Command[1:]...). Directory(task.Directory). @@ -43,3 +54,21 @@ func (t *TaskRunner) Run(task *Task) (*RunResult, error) { StopTime: time.Now(), }, nil } + +func (t *TaskRunner) setSkipRuns(val uint32) { + atomic.StoreUint32(&t.skipEnabled, val) +} + +// SkipRuns can be enabled to skip all further executions of tasks by Run(). +func (t *TaskRunner) SkipRuns(enabled bool) { + if enabled { + t.setSkipRuns(1) + } else { + t.setSkipRuns(0) + } +} + +// SkipRunsIsEnabled returns true if SkipRuns is enabled. +func (t *TaskRunner) SkipRunsIsEnabled() bool { + return atomic.LoadUint32(&t.skipEnabled) == 1 +}