Skip to content

Commit

Permalink
Signal shutdown with cancel context
Browse files Browse the repository at this point in the history
  • Loading branch information
breml committed Feb 22, 2021
1 parent c05bfd9 commit b4209bf
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 174 deletions.
9 changes: 7 additions & 2 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@ func TestIntegration(t *testing.T) {
log := testLogger
server := daemon.New(socket, logstashPath, log)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
defer cancel()

is := is.New(t)

defer server.Cleanup()

err := server.Run()
err := server.Run(ctx)
is.NoErr(err)
}()

Expand Down Expand Up @@ -107,5 +112,5 @@ func TestIntegration(t *testing.T) {
_, err := server.Shutdown(context.Background(), &grpc.ShutdownRequest{})
is.NoErr(err)

time.Sleep(3 * time.Second)
<-ctx.Done()
}
156 changes: 96 additions & 60 deletions internal/app/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ import (
)

type Daemon struct {
// This context is passed to exec.Command. When killFunc is called,
// the child process is killed with signal kill.
killFunc context.CancelFunc

// the ctxShutdownSignal allows shutdown request, that are received over
// gRPC to signal shutdown to the shutdownSignalHandler.
ctxShutdownSignal context.Context

// shutdownSignalFunc is used by the shutdown gRPC handler to signal
// shutdown to the shutdownSignalHandler.
shutdownSignalFunc context.CancelFunc

socket string
logstashPath string

Expand All @@ -36,18 +48,6 @@ type Daemon struct {
server *grpc.Server
logstashController *controller.Controller

// This channel is closed as soon as the shutdown is in progress.
shutdownInProgress chan struct{}

// Global shutdownLogstashInstance channel, all Go routines should listen to this channel to
// get notified and safely exit on shutdownLogstashInstance of the daemon.
// The shutdownSignalHandler() will close this channel on shutdownLogstashInstance.
shutdownLogstashInstance chan struct{}

// shutdownSignal is sent by the Shutdown GRPC handler, when a shutdown command
// is received. The shutdownSignal channel is processed by the shutdownSignalHandler().
shutdownSignal chan struct{}

// Global shutdown wait group. Daemon.Run() will wait for this wait group
// before returning and exiting the main Go routine.
shutdownLogstashInstancesWG *sync.WaitGroup
Expand All @@ -59,19 +59,26 @@ type Daemon struct {

// New creates a new logstash filter verifier daemon.
func New(socket string, logstashPath string, log logging.Logger) Daemon {
ctxShutdownSignal, shutdownSignalFunc := context.WithCancel(context.Background())
return Daemon{
socket: socket,
logstashPath: logstashPath,
log: log,
shutdownInProgress: make(chan struct{}),
shutdownLogstashInstance: make(chan struct{}),
shutdownSignal: make(chan struct{}),
shutdownLogstashInstancesWG: &sync.WaitGroup{},
ctxShutdownSignal: ctxShutdownSignal,
shutdownSignalFunc: shutdownSignalFunc,
}
}

// Run starts the logstash filter verifier daemon.
func (d *Daemon) Run() error {
func (d *Daemon) Run(ctx context.Context) error {
// Two stage exit, cancel allows for graceful shutdown
// kill exits sub processes with signal kill.
ctxKill, killFunc := context.WithCancel(ctx)
d.killFunc = killFunc
ctx, shutdown := context.WithCancel(ctxKill)
defer shutdown()

tempdir, err := ioutil.TempDir("", "lfv-")
if err != nil {
return err
Expand All @@ -81,72 +88,97 @@ func (d *Daemon) Run() error {

// Create and start Logstash Controller
d.shutdownLogstashInstancesWG.Add(1)
instance := logstash.New(d.logstashPath, d.log, d.shutdownLogstashInstance, d.shutdownLogstashInstancesWG)
logstashController, err := controller.NewController(instance, tempdir, d.log, d.shutdownLogstashInstance)
instance := logstash.New(ctxKill, d.logstashPath, d.log, d.shutdownLogstashInstancesWG)
logstashController, err := controller.NewController(instance, tempdir, d.log)
if err != nil {
return err
}
d.logstashController = logstashController

err = d.logstashController.Launch()
err = d.logstashController.Launch(ctx)
if err != nil {
return err
}

// Create Session Handler
d.sessionController = session.NewController(d.tempdir, d.logstashController, d.log)

// Setup signal handler and shutdown coordinator
shutdownHandlerCompleted := make(chan struct{})
go d.shutdownSignalHandler(shutdownHandlerCompleted)

// Create and start GRPC Server
lis, err := net.Listen("unix", d.socket)
if err != nil {
return err
}
d.server = grpc.NewServer()
pb.RegisterControlServer(d.server, d)
go func() {
d.log.Infof("Daemon listening on %s", d.socket)
err = d.server.Serve(lis)
if err != nil {
d.log.Error("failed to start daemon: %v", err)
shutdown()
}
}()

d.log.Infof("Daemon listening on %s", d.socket)
err = d.server.Serve(lis)

// This is called from the main Go routine, so we have to wait for all others
// to shutdown, before we can return and end the program/daemon.
<-shutdownHandlerCompleted
// Setup signal handler and shutdown coordinator
d.shutdownSignalHandler(shutdown)

return err
return nil
}

func (d *Daemon) shutdownSignalHandler(shutdownHandlerCompleted chan struct{}) {
// Make sure, shutdownHandlerCompleted channel is closed and main Go routine
// exits cleanly.
defer close(shutdownHandlerCompleted)
const hardExitDelay = 20 * time.Millisecond

func (d *Daemon) shutdownSignalHandler(shutdown func()) {
var hardExit bool

defer func() {
d.killFunc()
if hardExit {
// Give a little time to propagate Done to kill context
time.Sleep(hardExitDelay)
err := os.Remove(d.socket)
if err != nil && !os.IsNotExist(err) {
d.log.Warningf("failed to remove socket file %s during hard exit: %v", d.socket, err)
}
}
}()

// Listen to shutdown signal (comming from shutdown GRPC requests) as well
// Listen to shutdown signal (coming from shutdown GRPC requests) as well
// as OS signals interrupt and SIGTERM (not present on all systems).
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
sigInt := make(chan os.Signal, 10)
signal.Notify(sigInt, os.Interrupt)
sigTerm := make(chan os.Signal, 10)
signal.Notify(sigTerm, syscall.SIGTERM)

select {
case <-d.shutdownSignal:
case <-c:
case <-d.ctxShutdownSignal.Done():
case <-sigInt:
d.log.Info("Interrupt signal (Ctrl+c) received. Shutdown initiated.")
d.log.Info("Press Ctrl+c again to exit immediately")
case <-sigTerm:
d.log.Info("Term signal received. Shutdown initiated.")
}

// Shutdown signal or OS signal received, start shutdown procedure
// Signal shutdown to all
close(d.shutdownInProgress)
close(d.shutdownSignal)

// TODO: Make shutdown timeout configurable
t := time.NewTimer(3 * time.Second)
t := time.NewTimer(10 * time.Second)

// Wait for currently running sessions to finish.
select {
case <-d.sessionController.WaitFinish():
t.Stop()
case <-t.C:
d.log.Debug("Wait for sessions timed out")
case <-sigInt:
d.log.Debug("Double interrupt signal received, exit now")
hardExit = true
return
}
// Stop timer and drain channel
if !t.Stop() {
select {
case <-t.C:
default:
}
}
shutdown()

// Stop accepting new connections, wait for currently running handlers to finish properly.
serverStopped := make(chan struct{})
Expand All @@ -158,13 +190,12 @@ func (d *Daemon) shutdownSignalHandler(shutdownHandlerCompleted chan struct{}) {
// Stop Logstash instance
logstashInstancesStopped := make(chan struct{})
go func() {
close(d.shutdownLogstashInstance)
d.shutdownLogstashInstancesWG.Wait()
close(logstashInstancesStopped)
}()

// TODO: Make shutdown timeout configurable
t.Reset(3 * time.Second)
t.Reset(10 * time.Second)

// Wait for Logstash and GRPC Server to shutdown
serverStopComplete := false
Expand All @@ -184,9 +215,19 @@ func (d *Daemon) shutdownSignalHandler(shutdownHandlerCompleted chan struct{}) {
d.log.Debug("logstash instance successfully stopped.")
logstashInstanceStopComplete = true
logstashInstancesStopped = nil
case <-sigInt:
d.log.Debug("Double interrupt signal received, exit now")
hardExit = true
return
}
}
// Stop timer and drain channel
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Stop()
}

// Cleanup removes the temporary files created by the daemon.
Expand All @@ -200,27 +241,22 @@ func (d *Daemon) Cleanup() {
// Shutdown signals the daemon to shutdown.
func (d *Daemon) Shutdown(ctx context.Context, in *pb.ShutdownRequest) (*pb.ShutdownResponse, error) {
select {
case d.shutdownSignal <- struct{}{}:
case <-d.ctxShutdownSignal.Done():
return nil, errors.New("daemon is already shutting down")
default:
d.shutdownSignalFunc()
}

return &pb.ShutdownResponse{}, nil
}

func (d *Daemon) isShutdownInProgress() bool {
select {
case <-d.shutdownInProgress:
return true
default:
}
return false
}

// SetupTest creates a new session, receives the pipeline configuration
// (zip archive), and prepares the files for the new session.
func (d *Daemon) SetupTest(ctx context.Context, in *pb.SetupTestRequest) (*pb.SetupTestResponse, error) {
if d.isShutdownInProgress() {
select {
case <-d.ctxShutdownSignal.Done():
return nil, errors.New("daemon is shutting down, no new sessions accepted")
default:
}

pipelines, configFiles, err := d.extractZip(in.Pipeline)
Expand Down
4 changes: 3 additions & 1 deletion internal/app/daemon_start.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package app

import (
"context"

"github.com/spf13/cobra"
"github.com/spf13/viper"

Expand Down Expand Up @@ -32,5 +34,5 @@ func runDaemonStart(_ *cobra.Command, _ []string) error {
s := daemon.New(socket, logstashPath, log)
defer s.Cleanup()

return s.Run()
return s.Run(context.Background())
}
15 changes: 7 additions & 8 deletions internal/daemon/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"io/ioutil"
"os"
"path"
Expand All @@ -16,21 +17,21 @@ import (
const LogstashInstanceDirectoryPrefix = "logstash-instance"

type Controller struct {
ctx context.Context

id string

workDir string
log logging.Logger

shutdown chan struct{}

instance Instance

stateMachine *stateMachine
receivedEvents *events
pipelines *pipelines
}

func NewController(instance Instance, baseDir string, log logging.Logger, shutdown chan struct{}) (*Controller, error) {
func NewController(instance Instance, baseDir string, log logging.Logger) (*Controller, error) {
id := idgen.New()

workDir := path.Join(baseDir, LogstashInstanceDirectoryPrefix, id)
Expand Down Expand Up @@ -64,10 +65,8 @@ func NewController(instance Instance, baseDir string, log logging.Logger, shutdo
id: id,
workDir: workDir,
log: log,
shutdown: shutdown,
instance: instance,

stateMachine: newStateMachine(shutdown, log),
receivedEvents: newEvents(),
pipelines: newPipelines(),
}
Expand All @@ -84,13 +83,13 @@ func (c *Controller) ID() string {
return c.id
}

func (c *Controller) Launch() error {
func (c *Controller) Launch(ctx context.Context) error {
c.pipelines.reset("stdin", "output")
c.stateMachine = newStateMachine(ctx, c.log)
c.stateMachine.executeCommand(commandStart)

err := c.instance.Start(c, c.workDir)
err := c.instance.Start(ctx, c, c.workDir)
if err != nil {
c.instance.Shutdown()
return err
}

Expand Down
Loading

0 comments on commit b4209bf

Please sign in to comment.