Skip to content

Commit

Permalink
perf: 分离Application start stop 通用逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
TBXark committed Nov 7, 2024
1 parent 393a286 commit 2d70a55
Showing 1 changed file with 48 additions and 47 deletions.
95 changes: 48 additions & 47 deletions pkg/utils/boot/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package boot

import (
"context"
"fmt"
"github.com/tbxark/sphere/pkg/log"
"github.com/tbxark/sphere/pkg/log/logfields"
"golang.org/x/sync/errgroup"
Expand All @@ -23,60 +24,60 @@ func NewApplication(tasks ...Task) *Application {
}
}

func (a *Application) Run(ctx context.Context) error {
wg, ctx := errgroup.WithContext(ctx)
for _, task := range a.tasks {
log.Infof("runner %s start", task.Identifier())
runner := task
wg.Go(func() error {
defer func() {
if r := recover(); r != nil {
log.Errorw(
"runner panic",
logfields.String("runner", runner.Identifier()),
logfields.Any("recover", r),
)
}
}()
if err := runner.Start(ctx); err != nil {
func createTask(ctx context.Context, task Task, action string, taskFunc func(Task, context.Context) error) func() error {
return func() (err error) {
defer func() {
if r := recover(); r != nil {
log.Errorw(
"runner error",
logfields.String("runner", runner.Identifier()),
logfields.Error(err),
fmt.Sprintf("%s panic", action),
logfields.String("task", task.Identifier()),
logfields.Any("recover", r),
)
return err
err = fmt.Errorf("%s %s panic: %v", action, task.Identifier(), r)
}
return nil
})
}()
err = taskFunc(task, ctx)
if err != nil {
log.Errorw(
fmt.Sprintf("%s error", action),
logfields.String("task", task.Identifier()),
logfields.Error(err),
)
}
return
}
return wg.Wait()
}

func (a *Application) Close(ctx context.Context) error {
wg := errgroup.Group{}
func (a *Application) newErrorGroup(ctx context.Context, stopOnError bool) (*errgroup.Group, context.Context) {
group, errCtx := errgroup.WithContext(ctx)
if stopOnError {
return group, errCtx
}
return group, ctx
}

func (a *Application) executeTasks(ctx context.Context, action string, stopOnError bool, taskFunc func(Task, context.Context) error) error {
wg, gCtx := a.newErrorGroup(ctx, stopOnError)
for _, task := range a.tasks {
log.Infof("closer %s start", task.Identifier())
closer := task
wg.Go(func() error {
defer func() {
if r := recover(); r != nil {
log.Errorw(
"closer panic",
logfields.String("closer", closer.Identifier()),
logfields.Any("recover", r),
)
}
}()
if err := closer.Stop(ctx); err != nil {
log.Errorw(
"closer error",
logfields.String("closer", closer.Identifier()),
logfields.Error(err),
)
return err
}
return nil
})
log.Infof("%s %s", action, task.Identifier())
if gCtx.Err() != nil {
log.Infof("skip %s %s", action, task.Identifier())
}
wg.Go(createTask(gCtx, task, action, taskFunc))
}
return wg.Wait()
}

func (a *Application) Run(ctx context.Context) error {
// run 操作会因为一个任务失败而中断
return a.executeTasks(ctx, "start", true, func(t Task, ctx context.Context) error {
return t.Start(ctx)
})
}

func (a *Application) Close(ctx context.Context) error {
// close 操作不会因为一个任务失败而中断
return a.executeTasks(ctx, "stop", false, func(t Task, ctx context.Context) error {
return t.Stop(ctx)
})
}

0 comments on commit 2d70a55

Please sign in to comment.