From 5575531af8ec911d785cc87584a5c7964ab44e71 Mon Sep 17 00:00:00 2001 From: Charlie Jones Date: Wed, 11 Sep 2024 16:46:22 -0500 Subject: [PATCH] Add ShellExecutioner --- internal/goverseer/executioner/executioner.go | 3 + .../shell_executioner/shell_executioner.go | 234 ++++++++++++++++++ .../shell_executioner_test.go | 170 +++++++++++++ 3 files changed, 407 insertions(+) create mode 100644 internal/goverseer/executioner/shell_executioner/shell_executioner.go create mode 100644 internal/goverseer/executioner/shell_executioner/shell_executioner_test.go diff --git a/internal/goverseer/executioner/executioner.go b/internal/goverseer/executioner/executioner.go index c08b1c9..0955c11 100644 --- a/internal/goverseer/executioner/executioner.go +++ b/internal/goverseer/executioner/executioner.go @@ -8,6 +8,7 @@ import ( "github.com/lmittmann/tint" "github.com/simplifi/goverseer/internal/goverseer/config" "github.com/simplifi/goverseer/internal/goverseer/executioner/log_executioner" + "github.com/simplifi/goverseer/internal/goverseer/executioner/shell_executioner" ) // Executioner is an interface for executing actions @@ -28,6 +29,8 @@ func New(cfg *config.Config) (Executioner, error) { switch cfg.Executioner.Type { case "log": return log_executioner.New(*cfg, logger) + case "shell": + return shell_executioner.New(*cfg, logger) default: return nil, fmt.Errorf("unknown executioner type: %s", cfg.Executioner.Type) } diff --git a/internal/goverseer/executioner/shell_executioner/shell_executioner.go b/internal/goverseer/executioner/shell_executioner/shell_executioner.go new file mode 100644 index 0000000..1e27b61 --- /dev/null +++ b/internal/goverseer/executioner/shell_executioner/shell_executioner.go @@ -0,0 +1,234 @@ +package shell_executioner + +import ( + "bufio" + "context" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + + "github.com/lmittmann/tint" + "github.com/simplifi/goverseer/internal/goverseer/config" +) + +const ( + // DataEnvVarName is the name of the environment variable that will be set + // to the path of the file containing the data + DataEnvVarName = "GOVERSEER_DATA" + + // DefaultShell is the default shell to use when executing a command + DefaultShell = "/bin/sh" +) + +// ShellExecutionerConfig is the configuration for a shell executioner +type ShellExecutionerConfig struct { + // Command is the command to execute + Command string + + // Shell is the shell to use when executing the command + Shell string +} + +// ParseConfig parses the config for a log executioner +// It validates the config, sets defaults if missing, and returns the config +func ParseConfig(config interface{}) (*ShellExecutionerConfig, error) { + cfgMap, ok := config.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid config") + } + + cfg := &ShellExecutionerConfig{ + Shell: DefaultShell, + } + + // Command is required and must be a string + if command, ok := cfgMap["command"].(string); ok { + if command == "" { + return nil, fmt.Errorf("command must not be empty") + } + cfg.Command = command + } else if cfgMap["command"] != nil { + return nil, fmt.Errorf("command must be a string") + } else { + return nil, fmt.Errorf("command is required") + } + + // If shell is set, it should be a string + if cfgMap["shell"] != nil { + if shell, ok := cfgMap["shell"].(string); ok { + if shell == "" { + return nil, fmt.Errorf("shell must not be empty") + } + cfg.Shell = shell + } else if cfgMap["shell"] != nil { + return nil, fmt.Errorf("shell must be a string") + } + } + + return cfg, nil +} + +// ShellExecutioner runs a shell command +// It implements the Executioner interface +type ShellExecutioner struct { + // Command is the command to execute + Command string + + // Shell is the shell to use when executing the command + Shell string + + // workDir is the directory in which the ShellExecutioner will store + // the command to run and the data to pass into the command + workDir string + + // log is the logger + log *slog.Logger + + // stop is a channel to signal the executor to stop + stop chan struct{} + + // ctx is the context used to control the command and output scanner + ctx context.Context + + // cancel is the function to cancel the context + cancel context.CancelFunc +} + +// New creates a new ShellExecutioner based on the config +func New(cfg config.Config, log *slog.Logger) (*ShellExecutioner, error) { + pcfg, err := ParseConfig(cfg.Executioner.Config) + if err != nil { + return nil, fmt.Errorf("error parsing config: %w", err) + } + + // Create a temp directory to store the command and data + workDir, err := os.MkdirTemp("", fmt.Sprintf("goverseer-%s", cfg.Name)) + if err != nil { + return nil, fmt.Errorf("error creating work dir: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + return &ShellExecutioner{ + Command: pcfg.Command, + Shell: pcfg.Shell, + workDir: workDir, + log: log, + stop: make(chan struct{}), + ctx: ctx, + cancel: cancel, + }, nil +} + +// streamOutput streams the output of a pipe to the logger +func (e *ShellExecutioner) streamOutput(pipe io.ReadCloser) { + scanner := bufio.NewScanner(pipe) + for { + select { + case <-e.ctx.Done(): + e.log.Info("stopping output scanner") + return + default: + if scanner.Scan() { + e.log.Info("command", slog.String("output", scanner.Text())) + } else { + if err := scanner.Err(); err != nil { + // Avoid logging errors if the context was canceled mid-scan + // This will happen when the executioner is being stopped + if e.ctx.Err() == context.Canceled { + continue + } + e.log.Error("error reading output", tint.Err(err)) + } + return + } + } + } +} + +// writeToWorkDir writes the data to a file in the temporary work directory +// It returns the path to the file and an error if the data could not be written +func (e *ShellExecutioner) writeToWorkDir(name string, data interface{}) (string, error) { + filePath := fmt.Sprintf("%s/%s", e.workDir, name) + if err := os.WriteFile(filePath, []byte(data.(string)), 0644); err != nil { + return "", fmt.Errorf("error writing file to work dir: %w", err) + } + e.log.Info("wrote file to work dir", slog.String("path", filePath)) + return filePath, nil +} + +// Execute runs the command with the given data +// It returns an error if the command could not be started or if the command +// returned an error. +// The data is written to a temp file and the path is passed to the command via +// the DataEnvVarName environment variable. +// The command is started in the configured shell. +func (e *ShellExecutioner) Execute(data interface{}) error { + var dataPath, commandPath string + var err error + + defer os.RemoveAll(e.workDir) + + // Write the data to a file in the work directory + if dataPath, err = e.writeToWorkDir("data", data); err != nil { + return fmt.Errorf("error writing data to work dir: %w", err) + } + + // Write the command to a file in the work directory + if commandPath, err = e.writeToWorkDir("command", e.Command); err != nil { + return fmt.Errorf("error writing command to work dir: %w", err) + } + + // Build the command + cmd := exec.CommandContext(e.ctx, e.Shell, commandPath) + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", DataEnvVarName, dataPath)) + + // Handle output from command + combinedOutput, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("error creating output pipe: %w", err) + } + defer combinedOutput.Close() + + // Redirect stderr to stdout + cmd.Stderr = cmd.Stdout + + // Stream combined output to the logger + go func() { + e.streamOutput(combinedOutput) + }() + + // Start the command running + // This does not block and depends on the caller to call cmd.Wait() + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting command: %w", err) + } + + // Wait for the command to finish running, but don't block otherwise we'll + // never be able to stop the executor if the command hangs + wait := make(chan error, 1) + go func() { + wait <- cmd.Wait() + }() + + // Block here waiting for the command to complete or for the executor to stop + select { + case <-e.stop: + e.cancel() + return nil + case err := <-wait: + if err != nil { + return fmt.Errorf("error running command: %w", err) + } + } + + return nil +} + +// Stop signals the executioner to stop +func (e *ShellExecutioner) Stop() { + e.log.Info("shutting down executor") + close(e.stop) +} diff --git a/internal/goverseer/executioner/shell_executioner/shell_executioner_test.go b/internal/goverseer/executioner/shell_executioner/shell_executioner_test.go new file mode 100644 index 0000000..65bfde1 --- /dev/null +++ b/internal/goverseer/executioner/shell_executioner/shell_executioner_test.go @@ -0,0 +1,170 @@ +package shell_executioner + +import ( + "context" + "log/slog" + "os" + "testing" + "time" + + "github.com/lmittmann/tint" + "github.com/simplifi/goverseer/internal/goverseer/config" + "github.com/stretchr/testify/assert" +) + +func TestParseConfig(t *testing.T) { + var parsedConfig *ShellExecutionerConfig + + parsedConfig, err := ParseConfig(map[string]interface{}{ + "command": "echo 123", + }) + assert.NoError(t, err, + "Parsing a valid config should not return an error") + assert.Equal(t, "echo 123", parsedConfig.Command, + "Command should be set to the value in the config") + assert.Equal(t, DefaultShell, parsedConfig.Shell, + "Shell should be set to the default value") + + // Test setting the shell + parsedConfig, err = ParseConfig(map[string]interface{}{ + "command": "echo 123", + "shell": "/bin/bash", + }) + assert.NoError(t, err, + "Parsing a valid config should not return an error") + assert.Equal(t, "/bin/bash", parsedConfig.Shell, + "Shell should be set to the value in the config") + + parsedConfig, err = ParseConfig(map[string]interface{}{ + "command": "echo 123", + "shell": nil, + }) + assert.NoError(t, err, + "Parsing a config with a nil shell should not return an error") + assert.Equal(t, DefaultShell, parsedConfig.Shell, + "Parsing a config with a nil shell should set default value") + + _, err = ParseConfig(map[string]interface{}{ + "command": "echo 123", + "shell": 1, + }) + assert.Error(t, err, + "Parsing a config with an incorrect shell type should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "command": "echo 123", + "shell": "", + }) + assert.Error(t, err, + "Parsing a config with an empty shell should return an error") + + // Test setting the command + parsedConfig, err = ParseConfig(map[string]interface{}{ + "command": "echo 123", + }) + assert.NoError(t, err, + "Parsing a config with a valid command should not return an error") + assert.Equal(t, "echo 123", parsedConfig.Command, + "Command should be set to the value in the config") + + _, err = ParseConfig(map[string]interface{}{ + "command": 1, + }) + assert.Error(t, err, + "Parsing a config with an incorrect command type should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "command": "", + }) + assert.Error(t, err, + "Parsing a config with an empty command should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "command": nil, + }) + assert.Error(t, err, + "Parsing a config with a nil command should return an error") + + _, err = ParseConfig(map[string]interface{}{}) + assert.Error(t, err, + "Parsing a config with no command should return an error") +} + +func TestNew(t *testing.T) { + var cfg config.Config + cfg = config.Config{ + Name: "TestConfig", + Executioner: config.ExecutionerConfig{ + Type: "shell", + Config: map[string]interface{}{ + "command": "echo 123", + }, + }, + } + executioner, err := New(cfg, slog.Default()) + assert.NoError(t, err, + "Creating a new ShellExecutioner should not return an error") + assert.NotNil(t, executioner, + "Creating a new ShellExecutioner should return an executioner") + + cfg = config.Config{ + Name: "TestConfig", + Executioner: config.ExecutionerConfig{ + Type: "shell", + }, + } + executioner, err = New(cfg, slog.Default()) + assert.Error(t, err, + "Creating a new ShellExecutioner with an invalid config should return an error") + assert.Nil(t, executioner, + "Creating a new ShellExecutioner with an invalid config should not return an executioner") +} + +func TestShellExecutioner_Execute(t *testing.T) { + log := slog.New(tint.NewHandler(os.Stderr, &tint.Options{Level: slog.LevelError})) + tempDir, _ := os.MkdirTemp("", "goverseer-test") + ctx, cancel := context.WithCancel(context.Background()) + executioner := ShellExecutioner{ + Command: "echo ${GOVERSEER_DATA}", + Shell: DefaultShell, + workDir: tempDir, + log: log, + stop: make(chan struct{}), + ctx: ctx, + cancel: cancel, + } + + err := executioner.Execute("test_data") + assert.NoError(t, err, + "Executing a valid command should not return an error") +} + +func TestShellExecutioner_Stop(t *testing.T) { + log := slog.New(tint.NewHandler(os.Stderr, &tint.Options{Level: slog.LevelError})) + tempDir, _ := os.MkdirTemp("", "goverseer-test") + ctx, cancel := context.WithCancel(context.Background()) + executioner := ShellExecutioner{ + Command: "for i in {1..1000}; do echo $i; sleep 1; done", + Shell: DefaultShell, + workDir: tempDir, + log: log, + stop: make(chan struct{}), + ctx: ctx, + cancel: cancel, + } + + go func() { + executioner.Execute("test_data") + }() + + executioner.Stop() + + // Sleep for a second to allow the executioner time to stop + time.Sleep(1 * time.Second) + + assert.Equal(t, 0, len(executioner.stop), + "Stopping the executioner should close the stop channel") + + assert.Equal(t, context.Canceled, executioner.ctx.Err(), + "Stopping the executioner should cancel the context") +}