From b4209bfaea662c8c6cb7f707631c814634ca8112 Mon Sep 17 00:00:00 2001 From: Lucas Bremgartner Date: Mon, 22 Feb 2021 14:08:39 +0100 Subject: [PATCH] Signal shutdown with cancel context --- integration_test.go | 9 +- internal/app/daemon/daemon.go | 156 +++++++++++------- internal/app/daemon_start.go | 4 +- internal/daemon/controller/controller.go | 15 +- internal/daemon/controller/controller_test.go | 30 ++-- internal/daemon/controller/instance.go | 5 +- internal/daemon/controller/statemachine.go | 15 +- .../daemon/instance/logstash/dummy_reader.go | 9 +- internal/daemon/instance/logstash/instance.go | 41 +++-- .../daemon/instance/logstash/processors.go | 14 +- .../instance/mock/logstash_instance_mock.go | 59 +++---- internal/daemon/session/controller.go | 8 +- .../session/logstash_controller_mock_test.go | 24 ++- 13 files changed, 215 insertions(+), 174 deletions(-) diff --git a/integration_test.go b/integration_test.go index 25245eb..ec9d316 100644 --- a/integration_test.go +++ b/integration_test.go @@ -49,12 +49,17 @@ func TestIntegration(t *testing.T) { log := testLogger server := daemon.New(socket, logstashPath, log) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer cancel() + is := is.New(t) defer server.Cleanup() - err := server.Run() + err := server.Run(ctx) is.NoErr(err) }() @@ -107,5 +112,5 @@ func TestIntegration(t *testing.T) { _, err := server.Shutdown(context.Background(), &grpc.ShutdownRequest{}) is.NoErr(err) - time.Sleep(3 * time.Second) + <-ctx.Done() } diff --git a/internal/app/daemon/daemon.go b/internal/app/daemon/daemon.go index 70e57ef..c7dbf59 100644 --- a/internal/app/daemon/daemon.go +++ b/internal/app/daemon/daemon.go @@ -26,6 +26,18 @@ import ( ) type Daemon struct { + // This context is passed to exec.Command. When killFunc is called, + // the child process is killed with signal kill. + killFunc context.CancelFunc + + // the ctxShutdownSignal allows shutdown request, that are received over + // gRPC to signal shutdown to the shutdownSignalHandler. + ctxShutdownSignal context.Context + + // shutdownSignalFunc is used by the shutdown gRPC handler to signal + // shutdown to the shutdownSignalHandler. + shutdownSignalFunc context.CancelFunc + socket string logstashPath string @@ -36,18 +48,6 @@ type Daemon struct { server *grpc.Server logstashController *controller.Controller - // This channel is closed as soon as the shutdown is in progress. - shutdownInProgress chan struct{} - - // Global shutdownLogstashInstance channel, all Go routines should listen to this channel to - // get notified and safely exit on shutdownLogstashInstance of the daemon. - // The shutdownSignalHandler() will close this channel on shutdownLogstashInstance. - shutdownLogstashInstance chan struct{} - - // shutdownSignal is sent by the Shutdown GRPC handler, when a shutdown command - // is received. The shutdownSignal channel is processed by the shutdownSignalHandler(). - shutdownSignal chan struct{} - // Global shutdown wait group. Daemon.Run() will wait for this wait group // before returning and exiting the main Go routine. shutdownLogstashInstancesWG *sync.WaitGroup @@ -59,19 +59,26 @@ type Daemon struct { // New creates a new logstash filter verifier daemon. func New(socket string, logstashPath string, log logging.Logger) Daemon { + ctxShutdownSignal, shutdownSignalFunc := context.WithCancel(context.Background()) return Daemon{ socket: socket, logstashPath: logstashPath, log: log, - shutdownInProgress: make(chan struct{}), - shutdownLogstashInstance: make(chan struct{}), - shutdownSignal: make(chan struct{}), shutdownLogstashInstancesWG: &sync.WaitGroup{}, + ctxShutdownSignal: ctxShutdownSignal, + shutdownSignalFunc: shutdownSignalFunc, } } // Run starts the logstash filter verifier daemon. -func (d *Daemon) Run() error { +func (d *Daemon) Run(ctx context.Context) error { + // Two stage exit, cancel allows for graceful shutdown + // kill exits sub processes with signal kill. + ctxKill, killFunc := context.WithCancel(ctx) + d.killFunc = killFunc + ctx, shutdown := context.WithCancel(ctxKill) + defer shutdown() + tempdir, err := ioutil.TempDir("", "lfv-") if err != nil { return err @@ -81,14 +88,14 @@ func (d *Daemon) Run() error { // Create and start Logstash Controller d.shutdownLogstashInstancesWG.Add(1) - instance := logstash.New(d.logstashPath, d.log, d.shutdownLogstashInstance, d.shutdownLogstashInstancesWG) - logstashController, err := controller.NewController(instance, tempdir, d.log, d.shutdownLogstashInstance) + instance := logstash.New(ctxKill, d.logstashPath, d.log, d.shutdownLogstashInstancesWG) + logstashController, err := controller.NewController(instance, tempdir, d.log) if err != nil { return err } d.logstashController = logstashController - err = d.logstashController.Launch() + err = d.logstashController.Launch(ctx) if err != nil { return err } @@ -96,10 +103,6 @@ func (d *Daemon) Run() error { // Create Session Handler d.sessionController = session.NewController(d.tempdir, d.logstashController, d.log) - // Setup signal handler and shutdown coordinator - shutdownHandlerCompleted := make(chan struct{}) - go d.shutdownSignalHandler(shutdownHandlerCompleted) - // Create and start GRPC Server lis, err := net.Listen("unix", d.socket) if err != nil { @@ -107,46 +110,75 @@ func (d *Daemon) Run() error { } d.server = grpc.NewServer() pb.RegisterControlServer(d.server, d) + go func() { + d.log.Infof("Daemon listening on %s", d.socket) + err = d.server.Serve(lis) + if err != nil { + d.log.Error("failed to start daemon: %v", err) + shutdown() + } + }() - d.log.Infof("Daemon listening on %s", d.socket) - err = d.server.Serve(lis) - - // This is called from the main Go routine, so we have to wait for all others - // to shutdown, before we can return and end the program/daemon. - <-shutdownHandlerCompleted + // Setup signal handler and shutdown coordinator + d.shutdownSignalHandler(shutdown) - return err + return nil } -func (d *Daemon) shutdownSignalHandler(shutdownHandlerCompleted chan struct{}) { - // Make sure, shutdownHandlerCompleted channel is closed and main Go routine - // exits cleanly. - defer close(shutdownHandlerCompleted) +const hardExitDelay = 20 * time.Millisecond + +func (d *Daemon) shutdownSignalHandler(shutdown func()) { + var hardExit bool + + defer func() { + d.killFunc() + if hardExit { + // Give a little time to propagate Done to kill context + time.Sleep(hardExitDelay) + err := os.Remove(d.socket) + if err != nil && !os.IsNotExist(err) { + d.log.Warningf("failed to remove socket file %s during hard exit: %v", d.socket, err) + } + } + }() - // Listen to shutdown signal (comming from shutdown GRPC requests) as well + // Listen to shutdown signal (coming from shutdown GRPC requests) as well // as OS signals interrupt and SIGTERM (not present on all systems). - c := make(chan os.Signal, 2) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) + sigInt := make(chan os.Signal, 10) + signal.Notify(sigInt, os.Interrupt) + sigTerm := make(chan os.Signal, 10) + signal.Notify(sigTerm, syscall.SIGTERM) select { - case <-d.shutdownSignal: - case <-c: + case <-d.ctxShutdownSignal.Done(): + case <-sigInt: + d.log.Info("Interrupt signal (Ctrl+c) received. Shutdown initiated.") + d.log.Info("Press Ctrl+c again to exit immediately") + case <-sigTerm: + d.log.Info("Term signal received. Shutdown initiated.") } - // Shutdown signal or OS signal received, start shutdown procedure - // Signal shutdown to all - close(d.shutdownInProgress) - close(d.shutdownSignal) - // TODO: Make shutdown timeout configurable - t := time.NewTimer(3 * time.Second) + t := time.NewTimer(10 * time.Second) // Wait for currently running sessions to finish. select { case <-d.sessionController.WaitFinish(): - t.Stop() case <-t.C: + d.log.Debug("Wait for sessions timed out") + case <-sigInt: + d.log.Debug("Double interrupt signal received, exit now") + hardExit = true + return } + // Stop timer and drain channel + if !t.Stop() { + select { + case <-t.C: + default: + } + } + shutdown() // Stop accepting new connections, wait for currently running handlers to finish properly. serverStopped := make(chan struct{}) @@ -158,13 +190,12 @@ func (d *Daemon) shutdownSignalHandler(shutdownHandlerCompleted chan struct{}) { // Stop Logstash instance logstashInstancesStopped := make(chan struct{}) go func() { - close(d.shutdownLogstashInstance) d.shutdownLogstashInstancesWG.Wait() close(logstashInstancesStopped) }() // TODO: Make shutdown timeout configurable - t.Reset(3 * time.Second) + t.Reset(10 * time.Second) // Wait for Logstash and GRPC Server to shutdown serverStopComplete := false @@ -184,9 +215,19 @@ func (d *Daemon) shutdownSignalHandler(shutdownHandlerCompleted chan struct{}) { d.log.Debug("logstash instance successfully stopped.") logstashInstanceStopComplete = true logstashInstancesStopped = nil + case <-sigInt: + d.log.Debug("Double interrupt signal received, exit now") + hardExit = true + return + } + } + // Stop timer and drain channel + if !t.Stop() { + select { + case <-t.C: + default: } } - t.Stop() } // Cleanup removes the temporary files created by the daemon. @@ -200,27 +241,22 @@ func (d *Daemon) Cleanup() { // Shutdown signals the daemon to shutdown. func (d *Daemon) Shutdown(ctx context.Context, in *pb.ShutdownRequest) (*pb.ShutdownResponse, error) { select { - case d.shutdownSignal <- struct{}{}: + case <-d.ctxShutdownSignal.Done(): + return nil, errors.New("daemon is already shutting down") default: + d.shutdownSignalFunc() } return &pb.ShutdownResponse{}, nil } -func (d *Daemon) isShutdownInProgress() bool { - select { - case <-d.shutdownInProgress: - return true - default: - } - return false -} - // SetupTest creates a new session, receives the pipeline configuration // (zip archive), and prepares the files for the new session. func (d *Daemon) SetupTest(ctx context.Context, in *pb.SetupTestRequest) (*pb.SetupTestResponse, error) { - if d.isShutdownInProgress() { + select { + case <-d.ctxShutdownSignal.Done(): return nil, errors.New("daemon is shutting down, no new sessions accepted") + default: } pipelines, configFiles, err := d.extractZip(in.Pipeline) diff --git a/internal/app/daemon_start.go b/internal/app/daemon_start.go index 765a37c..fa623c9 100644 --- a/internal/app/daemon_start.go +++ b/internal/app/daemon_start.go @@ -1,6 +1,8 @@ package app import ( + "context" + "github.com/spf13/cobra" "github.com/spf13/viper" @@ -32,5 +34,5 @@ func runDaemonStart(_ *cobra.Command, _ []string) error { s := daemon.New(socket, logstashPath, log) defer s.Cleanup() - return s.Run() + return s.Run(context.Background()) } diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 2fd6394..21f5ca0 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -1,6 +1,7 @@ package controller import ( + "context" "io/ioutil" "os" "path" @@ -16,13 +17,13 @@ import ( const LogstashInstanceDirectoryPrefix = "logstash-instance" type Controller struct { + ctx context.Context + id string workDir string log logging.Logger - shutdown chan struct{} - instance Instance stateMachine *stateMachine @@ -30,7 +31,7 @@ type Controller struct { pipelines *pipelines } -func NewController(instance Instance, baseDir string, log logging.Logger, shutdown chan struct{}) (*Controller, error) { +func NewController(instance Instance, baseDir string, log logging.Logger) (*Controller, error) { id := idgen.New() workDir := path.Join(baseDir, LogstashInstanceDirectoryPrefix, id) @@ -64,10 +65,8 @@ func NewController(instance Instance, baseDir string, log logging.Logger, shutdo id: id, workDir: workDir, log: log, - shutdown: shutdown, instance: instance, - stateMachine: newStateMachine(shutdown, log), receivedEvents: newEvents(), pipelines: newPipelines(), } @@ -84,13 +83,13 @@ func (c *Controller) ID() string { return c.id } -func (c *Controller) Launch() error { +func (c *Controller) Launch(ctx context.Context) error { c.pipelines.reset("stdin", "output") + c.stateMachine = newStateMachine(ctx, c.log) c.stateMachine.executeCommand(commandStart) - err := c.instance.Start(c, c.workDir) + err := c.instance.Start(ctx, c, c.workDir) if err != nil { - c.instance.Shutdown() return err } diff --git a/internal/daemon/controller/controller_test.go b/internal/daemon/controller/controller_test.go index 316d6a4..59ec811 100644 --- a/internal/daemon/controller/controller_test.go +++ b/internal/daemon/controller/controller_test.go @@ -1,6 +1,7 @@ package controller_test import ( + "context" "errors" "path" "testing" @@ -29,7 +30,7 @@ func TestNewController(t *testing.T) { tempdir := t.TempDir() - c, err := controller.NewController(nil, tempdir, logging.NoopLogger, nil) + c, err := controller.NewController(nil, tempdir, logging.NoopLogger) is.NoErr(err) is.True(file.Exists(path.Join(tempdir, controller.LogstashInstanceDirectoryPrefix, c.ID(), "logstash.yml"))) // logstash.yml @@ -64,18 +65,17 @@ func TestLaunch(t *testing.T) { is := is.New(t) instance := &mock.InstanceMock{ - StartFunc: func(controllerMoqParam *controller.Controller, workdir string) error { + StartFunc: func(ctx context.Context, controllerMoqParam *controller.Controller, workdir string) error { return test.instanceStartErr }, - ShutdownFunc: func() {}, } tempdir := t.TempDir() - c, err := controller.NewController(instance, tempdir, logging.NoopLogger, nil) + c, err := controller.NewController(instance, tempdir, logging.NoopLogger) is.NoErr(err) - err = c.Launch() + err = c.Launch(context.Background()) is.True(err != nil == test.wantErr) // Launch error }) } @@ -95,7 +95,7 @@ func TestCompleteCycle(t *testing.T) { is := is.New(t) instance := &mock.InstanceMock{ - StartFunc: func(controllerMoqParam *controller.Controller, workdir string) error { + StartFunc: func(ctx context.Context, controllerMoqParam *controller.Controller, workdir string) error { return nil }, ConfigReloadFunc: func() error { @@ -105,12 +105,10 @@ func TestCompleteCycle(t *testing.T) { tempdir := t.TempDir() - shutdown := make(chan struct{}) - - c, err := controller.NewController(instance, tempdir, logging.NoopLogger, shutdown) + c, err := controller.NewController(instance, tempdir, logging.NoopLogger) is.NoErr(err) - err = c.Launch() + err = c.Launch(context.Background()) is.NoErr(err) // Simulate pipelines ready from instance @@ -182,7 +180,7 @@ func TestSetupTest_Shutdown(t *testing.T) { is := is.New(t) instance := &mock.InstanceMock{ - StartFunc: func(controllerMoqParam *controller.Controller, workdir string) error { + StartFunc: func(ctx context.Context, controllerMoqParam *controller.Controller, workdir string) error { return nil }, ConfigReloadFunc: func() error { @@ -191,16 +189,16 @@ func TestSetupTest_Shutdown(t *testing.T) { } tempdir := t.TempDir() - shutdown := make(chan struct{}) - c, err := controller.NewController(instance, tempdir, logging.NoopLogger, shutdown) + c, err := controller.NewController(instance, tempdir, logging.NoopLogger) is.NoErr(err) - err = c.Launch() + ctx, cancel := context.WithCancel(context.Background()) + err = c.Launch(ctx) is.NoErr(err) - // Simulate shutdown signal - close(shutdown) + // signal shutdown + cancel() pipelines := pipeline.Pipelines{ pipeline.Pipeline{ diff --git a/internal/daemon/controller/instance.go b/internal/daemon/controller/instance.go index 1f6dbe8..b16a2f8 100644 --- a/internal/daemon/controller/instance.go +++ b/internal/daemon/controller/instance.go @@ -1,9 +1,10 @@ package controller +import "context" + //go:generate moq -fmt goimports -pkg mock -out ../instance/mock/logstash_instance_mock.go . Instance type Instance interface { - Start(controller *Controller, workdir string) error - Shutdown() + Start(ctx context.Context, controller *Controller, workdir string) error ConfigReload() error } diff --git a/internal/daemon/controller/statemachine.go b/internal/daemon/controller/statemachine.go index 6320ab1..99b3d67 100644 --- a/internal/daemon/controller/statemachine.go +++ b/internal/daemon/controller/statemachine.go @@ -1,6 +1,7 @@ package controller import ( + "context" "sync" "github.com/pkg/errors" @@ -9,27 +10,29 @@ import ( ) type stateMachine struct { + ctx context.Context + currentState stateName mutex *sync.Mutex cond *sync.Cond - shutdown chan struct{} - log logging.Logger + log logging.Logger } -func newStateMachine(shutdown chan struct{}, log logging.Logger) *stateMachine { +func newStateMachine(ctx context.Context, log logging.Logger) *stateMachine { mu := &sync.Mutex{} cond := sync.NewCond(mu) go func() { - <-shutdown + <-ctx.Done() log.Debug("broadcast shutdown for waitForState") cond.Broadcast() }() return &stateMachine{ + ctx: ctx, + currentState: stateCreated, mutex: mu, cond: cond, - shutdown: shutdown, log: log, } } @@ -45,7 +48,7 @@ func (s *stateMachine) waitForState(target stateName) error { s.cond.Wait() select { - case <-s.shutdown: + case <-s.ctx.Done(): // TODO: Can we do this without error return? return errors.Errorf("shutdown while waiting for state: %s", target) default: diff --git a/internal/daemon/instance/logstash/dummy_reader.go b/internal/daemon/instance/logstash/dummy_reader.go index 2be6dc6..88fba1f 100644 --- a/internal/daemon/instance/logstash/dummy_reader.go +++ b/internal/daemon/instance/logstash/dummy_reader.go @@ -1,6 +1,9 @@ package logstash -import "io" +import ( + "context" + "io" +) // stdinBlockReader implements the io.Reader interface and blocks reading // until the shutdown channel unblocks (close of channel). @@ -10,10 +13,10 @@ import "io" // This stdinBlockReader is used to block stdin of the controlled // Logstash instance. type stdinBlockReader struct { - shutdown chan struct{} + ctx context.Context } func (s *stdinBlockReader) Read(_ []byte) (int, error) { - <-s.shutdown + <-s.ctx.Done() return 0, io.EOF } diff --git a/internal/daemon/instance/logstash/instance.go b/internal/daemon/instance/logstash/instance.go index a5ce42b..342e412 100644 --- a/internal/daemon/instance/logstash/instance.go +++ b/internal/daemon/instance/logstash/instance.go @@ -1,6 +1,7 @@ package logstash import ( + "context" "os" "os/exec" "sync" @@ -14,6 +15,9 @@ import ( ) type instance struct { + ctxKill context.Context + ctxShutdown context.Context + controller *controller.Controller command string @@ -22,18 +26,16 @@ type instance struct { log logging.Logger logstashStarted chan struct{} - shutdown chan struct{} - instanceShutdown chan struct{} logstashShutdownWG *sync.WaitGroup shutdownWG *sync.WaitGroup } -func New(command string, log logging.Logger, shutdown chan struct{}, shutdownWG *sync.WaitGroup) controller.Instance { +func New(ctxKill context.Context, command string, log logging.Logger, shutdownWG *sync.WaitGroup) controller.Instance { return &instance{ + ctxKill: ctxKill, command: command, log: log, logstashStarted: make(chan struct{}), - shutdown: shutdown, logstashShutdownWG: &sync.WaitGroup{}, shutdownWG: shutdownWG, } @@ -41,7 +43,17 @@ func New(command string, log logging.Logger, shutdown chan struct{}, shutdownWG // start starts a Logstash child process with the previously supplied // configuration. -func (i *instance) Start(controller *controller.Controller, workdir string) error { +func (i *instance) Start(ctx context.Context, controller *controller.Controller, workdir string) (err error) { + ctx, cancel := context.WithCancel(ctx) + i.ctxShutdown = ctx + defer func() { + // if there has been an error during Start, cancel the context to signal + // shutdown to all potentially running Go routines of instance. + if err != nil { + cancel() + } + }() + i.controller = controller args := []string{ @@ -49,7 +61,7 @@ func (i *instance) Start(controller *controller.Controller, workdir string) erro workdir, } - i.child = exec.Command(i.command, args...) + i.child = exec.CommandContext(i.ctxKill, i.command, args...) // nolint: gosec stdout, err := i.child.StdoutPipe() if err != nil { return errors.Wrap(err, "failed to setup stdoutPipe") @@ -58,10 +70,12 @@ func (i *instance) Start(controller *controller.Controller, workdir string) erro if err != nil { return errors.Wrap(err, "failed to setup stdoutPipe") } - i.instanceShutdown = make(chan struct{}) i.child.Stdin = &stdinBlockReader{ - shutdown: i.instanceShutdown, + ctx: i.ctxShutdown, } + // Ensure a separate process group id for the Logstash child process, such + // that signals like interrupt are not propagated automatically. + i.child.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} i.logstashShutdownWG.Add(2) go i.stdoutProcessor(stdout) @@ -89,17 +103,9 @@ func (i *instance) Start(controller *controller.Controller, workdir string) erro return nil } -// TODO: What is needed for what? func (i *instance) shutdownSignalHandler() { // Wait for shutdown signal coming from the daemon. - <-i.shutdown - - i.Shutdown() -} - -// TODO: What is needed for what? -func (i *instance) Shutdown() { - close(i.instanceShutdown) + <-i.ctxShutdown.Done() i.stopLogstash() @@ -118,7 +124,6 @@ func (i *instance) stopLogstash() { i.log.Errorf("failed to send SIGTERM, Logstash might already be down:", err) } - // TODO: Add timeout, then send syscall.SIGKILL err = i.child.Wait() if err != nil { i.log.Errorf("failed to wait for child process: %v", err) diff --git a/internal/daemon/instance/logstash/processors.go b/internal/daemon/instance/logstash/processors.go index 25a607e..fc4f4f9 100644 --- a/internal/daemon/instance/logstash/processors.go +++ b/internal/daemon/instance/logstash/processors.go @@ -15,7 +15,7 @@ func (i *instance) stdoutProcessor(stdout io.ReadCloser) { // The stdoutProcessor can only be started after the process is created. select { case <-i.logstashStarted: - case <-i.instanceShutdown: + case <-i.ctxShutdown.Done(): return } @@ -42,9 +42,9 @@ func (i *instance) stdoutProcessor(stdout io.ReadCloser) { // Termination of stdout scanner is only expected, if shutdown is in progress. select { - case <-i.instanceShutdown: + case <-i.ctxShutdown.Done(): default: - i.log.Warning("stdout scanner closed unexpectetly") + i.log.Warning("stdout scanner closed unexpectedly") } i.log.Debug("exit stdout scanner") @@ -56,7 +56,7 @@ func (i *instance) stderrProcessor(stderr io.ReadCloser) { // The stderrProcessor can only be started after the process is created. select { case <-i.logstashStarted: - case <-i.instanceShutdown: + case <-i.ctxShutdown.Done(): return } @@ -72,9 +72,9 @@ func (i *instance) stderrProcessor(stderr io.ReadCloser) { // Termination of stderr scanner is only expected, if shutdown is in progress. select { - case <-i.instanceShutdown: + case <-i.ctxShutdown.Done(): default: - i.log.Warning("stderr scanner closed unexpectetly") + i.log.Warning("stderr scanner closed unexpectedly") } i.log.Debug("exit stderr scanner") @@ -99,7 +99,7 @@ func (i *instance) logstashLogProcessor(t *tail.Tail) { i.controller.PipelinesReady(runningPipelines...) } - case <-i.instanceShutdown: + case <-i.ctxShutdown.Done(): i.log.Debug("shutdown log reader") return } diff --git a/internal/daemon/instance/mock/logstash_instance_mock.go b/internal/daemon/instance/mock/logstash_instance_mock.go index 58b6b89..63c4ca4 100644 --- a/internal/daemon/instance/mock/logstash_instance_mock.go +++ b/internal/daemon/instance/mock/logstash_instance_mock.go @@ -4,6 +4,7 @@ package mock import ( + "context" "sync" "github.com/magnusbaeck/logstash-filter-verifier/v2/internal/daemon/controller" @@ -22,10 +23,7 @@ var _ controller.Instance = &InstanceMock{} // ConfigReloadFunc: func() error { // panic("mock out the ConfigReload method") // }, -// ShutdownFunc: func() { -// panic("mock out the Shutdown method") -// }, -// StartFunc: func(controllerMoqParam *controller.Controller, workdir string) error { +// StartFunc: func(ctx context.Context, controllerMoqParam *controller.Controller, workdir string) error { // panic("mock out the Start method") // }, // } @@ -38,20 +36,18 @@ type InstanceMock struct { // ConfigReloadFunc mocks the ConfigReload method. ConfigReloadFunc func() error - // ShutdownFunc mocks the Shutdown method. - ShutdownFunc func() - // StartFunc mocks the Start method. - StartFunc func(controllerMoqParam *controller.Controller, workdir string) error + StartFunc func(ctx context.Context, controllerMoqParam *controller.Controller, workdir string) error // calls tracks calls to the methods. calls struct { // ConfigReload holds details about calls to the ConfigReload method. - ConfigReload []struct{} - // Shutdown holds details about calls to the Shutdown method. - Shutdown []struct{} + ConfigReload []struct { + } // Start holds details about calls to the Start method. Start []struct { + // Ctx is the ctx argument value. + Ctx context.Context // ControllerMoqParam is the controllerMoqParam argument value. ControllerMoqParam *controller.Controller // Workdir is the workdir argument value. @@ -59,7 +55,6 @@ type InstanceMock struct { } } lockConfigReload sync.RWMutex - lockShutdown sync.RWMutex lockStart sync.RWMutex } @@ -68,7 +63,8 @@ func (mock *InstanceMock) ConfigReload() error { if mock.ConfigReloadFunc == nil { panic("InstanceMock.ConfigReloadFunc: method is nil but Instance.ConfigReload was just called") } - callInfo := struct{}{} + callInfo := struct { + }{} mock.lockConfigReload.Lock() mock.calls.ConfigReload = append(mock.calls.ConfigReload, callInfo) mock.lockConfigReload.Unlock() @@ -78,63 +74,46 @@ func (mock *InstanceMock) ConfigReload() error { // ConfigReloadCalls gets all the calls that were made to ConfigReload. // Check the length with: // len(mockedInstance.ConfigReloadCalls()) -func (mock *InstanceMock) ConfigReloadCalls() []struct{} { - var calls []struct{} +func (mock *InstanceMock) ConfigReloadCalls() []struct { +} { + var calls []struct { + } mock.lockConfigReload.RLock() calls = mock.calls.ConfigReload mock.lockConfigReload.RUnlock() return calls } -// Shutdown calls ShutdownFunc. -func (mock *InstanceMock) Shutdown() { - if mock.ShutdownFunc == nil { - panic("InstanceMock.ShutdownFunc: method is nil but Instance.Shutdown was just called") - } - callInfo := struct{}{} - mock.lockShutdown.Lock() - mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo) - mock.lockShutdown.Unlock() - mock.ShutdownFunc() -} - -// ShutdownCalls gets all the calls that were made to Shutdown. -// Check the length with: -// len(mockedInstance.ShutdownCalls()) -func (mock *InstanceMock) ShutdownCalls() []struct{} { - var calls []struct{} - mock.lockShutdown.RLock() - calls = mock.calls.Shutdown - mock.lockShutdown.RUnlock() - return calls -} - // Start calls StartFunc. -func (mock *InstanceMock) Start(controllerMoqParam *controller.Controller, workdir string) error { +func (mock *InstanceMock) Start(ctx context.Context, controllerMoqParam *controller.Controller, workdir string) error { if mock.StartFunc == nil { panic("InstanceMock.StartFunc: method is nil but Instance.Start was just called") } callInfo := struct { + Ctx context.Context ControllerMoqParam *controller.Controller Workdir string }{ + Ctx: ctx, ControllerMoqParam: controllerMoqParam, Workdir: workdir, } mock.lockStart.Lock() mock.calls.Start = append(mock.calls.Start, callInfo) mock.lockStart.Unlock() - return mock.StartFunc(controllerMoqParam, workdir) + return mock.StartFunc(ctx, controllerMoqParam, workdir) } // StartCalls gets all the calls that were made to Start. // Check the length with: // len(mockedInstance.StartCalls()) func (mock *InstanceMock) StartCalls() []struct { + Ctx context.Context ControllerMoqParam *controller.Controller Workdir string } { var calls []struct { + Ctx context.Context ControllerMoqParam *controller.Controller Workdir string } diff --git a/internal/daemon/session/controller.go b/internal/daemon/session/controller.go index 486b106..7f2e989 100644 --- a/internal/daemon/session/controller.go +++ b/internal/daemon/session/controller.go @@ -103,9 +103,11 @@ func (s *Controller) WaitFinish() chan struct{} { s.cond.Broadcast() - s.wg.Wait() - c := make(chan struct{}) - close(c) + go func() { + s.wg.Wait() + close(c) + }() + return c } diff --git a/internal/daemon/session/logstash_controller_mock_test.go b/internal/daemon/session/logstash_controller_mock_test.go index 0917476..295749d 100644 --- a/internal/daemon/session/logstash_controller_mock_test.go +++ b/internal/daemon/session/logstash_controller_mock_test.go @@ -61,14 +61,16 @@ type LogstashControllerMock struct { ExpectedEvents int } // GetResults holds details about calls to the GetResults method. - GetResults []struct{} + GetResults []struct { + } // SetupTest holds details about calls to the SetupTest method. SetupTest []struct { // Pipelines is the pipelines argument value. Pipelines pipeline.Pipelines } // Teardown holds details about calls to the Teardown method. - Teardown []struct{} + Teardown []struct { + } } lockExecuteTest sync.RWMutex lockGetResults sync.RWMutex @@ -116,7 +118,8 @@ func (mock *LogstashControllerMock) GetResults() ([]string, error) { if mock.GetResultsFunc == nil { panic("LogstashControllerMock.GetResultsFunc: method is nil but LogstashController.GetResults was just called") } - callInfo := struct{}{} + callInfo := struct { + }{} mock.lockGetResults.Lock() mock.calls.GetResults = append(mock.calls.GetResults, callInfo) mock.lockGetResults.Unlock() @@ -126,8 +129,10 @@ func (mock *LogstashControllerMock) GetResults() ([]string, error) { // GetResultsCalls gets all the calls that were made to GetResults. // Check the length with: // len(mockedLogstashController.GetResultsCalls()) -func (mock *LogstashControllerMock) GetResultsCalls() []struct{} { - var calls []struct{} +func (mock *LogstashControllerMock) GetResultsCalls() []struct { +} { + var calls []struct { + } mock.lockGetResults.RLock() calls = mock.calls.GetResults mock.lockGetResults.RUnlock() @@ -170,7 +175,8 @@ func (mock *LogstashControllerMock) Teardown() error { if mock.TeardownFunc == nil { panic("LogstashControllerMock.TeardownFunc: method is nil but LogstashController.Teardown was just called") } - callInfo := struct{}{} + callInfo := struct { + }{} mock.lockTeardown.Lock() mock.calls.Teardown = append(mock.calls.Teardown, callInfo) mock.lockTeardown.Unlock() @@ -180,8 +186,10 @@ func (mock *LogstashControllerMock) Teardown() error { // TeardownCalls gets all the calls that were made to Teardown. // Check the length with: // len(mockedLogstashController.TeardownCalls()) -func (mock *LogstashControllerMock) TeardownCalls() []struct{} { - var calls []struct{} +func (mock *LogstashControllerMock) TeardownCalls() []struct { +} { + var calls []struct { + } mock.lockTeardown.RLock() calls = mock.calls.Teardown mock.lockTeardown.RUnlock()