Skip to content

Commit

Permalink
refactor: simplify the concurrency logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dimakis committed Sep 15, 2023
1 parent 0d92fca commit 2b5987c
Showing 1 changed file with 21 additions and 37 deletions.
58 changes: 21 additions & 37 deletions cmd/kar-controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,15 @@ func Run(ctx context.Context, opt *options.ServerOption) error {
return fmt.Errorf("failed to create a job controller")
}

g, gCtx := errgroup.WithContext(ctx)

// Start the health and metrics servers
err = startHealthAndMetricsServers(ctx, opt)
// metrics server
metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler())
if err != nil {
return err
}

healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler())
if err != nil {
return err
}
Expand All @@ -86,54 +92,32 @@ func Run(ctx context.Context, opt *options.ServerOption) error {
return fmt.Errorf("failed to create a job controller")
}

// Run the job controller in a goroutine and wait for it to exit
wg := sync.WaitGroup{}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
g.Go(func() error {
defer wg.Done()
jobctrl.Run(ctx.Done())
}()

wg.Wait()


return nil
}

func healthHandler() http.Handler {
healthHandler := http.NewServeMux()
healthHandler.Handle("/healthz", &health.Handler{})
return healthHandler
}

// Starts the health probe listener
func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error {
g, ctx := errgroup.WithContext(ctx)

// metrics server
metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler())
if err != nil {
return err
}

healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler())
if err != nil {
return err
}
jobctrl.Run(gCtx.Done())
return nil
})

g.Go(metricsServer.Start)
g.Go(healthServer.Start)

g.Go(func() error {
<-ctx.Done()
wg.Wait()
return metricsServer.Shutdown()
})

g.Go(func() error {
<-ctx.Done()
wg.Wait()
return healthServer.Shutdown()
})

return g.Wait()
return g.Wait()
}

func healthHandler() http.Handler {
healthHandler := http.NewServeMux()
healthHandler.Handle("/healthz", &health.Handler{})
return healthHandler
}

0 comments on commit 2b5987c

Please sign in to comment.