diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index 96dc98b..a1acd17 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -7,28 +7,37 @@ import ( "os" "task-runner-launcher/internal/commands" + "task-runner-launcher/internal/config" "task-runner-launcher/internal/errorreporting" "task-runner-launcher/internal/http" "task-runner-launcher/internal/logs" + + "github.com/sethvargo/go-envconfig" ) func main() { - logLevel := os.Getenv("N8N_LAUNCHER_LOG_LEVEL") - - logs.SetLevel(logLevel) // default info - flag.Usage = func() { - logs.Infof("Usage: %s [runner-type]", os.Args[0]) + fmt.Printf("Usage: %s [runner-type]\n", os.Args[0]) flag.PrintDefaults() } if len(os.Args) < 2 { - logs.Error("Missing runner-type argument") + os.Stderr.WriteString("Missing runner-type argument") flag.Usage() os.Exit(1) } - errorreporting.Init() + runnerType := os.Args[1] + + cfg, err := config.LoadConfig(runnerType, envconfig.OsLookuper()) + if err != nil { + logs.Errorf("Failed to load config: %v", err) + os.Exit(1) + } + + logs.SetLevel(cfg.LogLevel) + + errorreporting.Init(cfg.Sentry) defer errorreporting.Close() srv := http.NewHealthCheckServer() @@ -46,10 +55,9 @@ func main() { } }() - runnerType := os.Args[1] - cmd := &commands.LaunchCommand{RunnerType: runnerType} + cmd := &commands.LaunchCommand{} - if err := cmd.Execute(); err != nil { + if err := cmd.Execute(cfg); err != nil { logs.Errorf("Failed to execute `launch` command: %s", err) } } diff --git a/go.mod b/go.mod index 16270dd..3be4b1c 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,14 @@ go 1.23.3 require ( github.com/getsentry/sentry-go v0.29.1 github.com/gorilla/websocket v1.5.3 + github.com/sethvargo/go-envconfig v1.1.0 + github.com/stretchr/testify v1.8.2 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index bc8bad3..42cd9f7 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,12 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/getsentry/sentry-go v0.29.1 h1:DyZuChN8Hz3ARxGVV8ePaNXh1dQ7d76AiB117xcREwA= github.com/getsentry/sentry-go v0.29.1/go.mod h1:x3AtIzN01d6SiWkderzaH28Tm0lgkafpJ5Bm3li39O0= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= @@ -14,11 +15,21 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sethvargo/go-envconfig v1.1.0 h1:cWZiJxeTm7AlCvzGXrEXaSTCNgip5oJepekh/BOQuog= +github.com/sethvargo/go-envconfig v1.1.0/go.mod h1:JLd0KFWQYzyENqnEPWWZ49i4vzZo/6nRidxI8YvGiHw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/commands/launch.go b/internal/commands/launch.go index 457b876..20e084b 100644 --- a/internal/commands/launch.go +++ b/internal/commands/launch.go @@ -20,66 +20,46 @@ type Command interface { Execute() error } -type LaunchCommand struct { - RunnerType string -} - -func (l *LaunchCommand) Execute() error { - logs.Info("Starting to execute `launch` command") +type LaunchCommand struct{} - // 0. validate env vars - - envCfg, err := env.FromEnv() - if err != nil { - return fmt.Errorf("env vars failed validation: %w", err) - } - - // 1. read runner config - - runnerCfg, err := config.GetRunnerConfig(l.RunnerType) - if err != nil { - return fmt.Errorf("failed to get runner config: %w", err) - } +func (l *LaunchCommand) Execute(cfg *config.Config) error { + logs.Info("Starting launcher...") - // 2. change into working directory + // 1. change into working directory - if err := os.Chdir(runnerCfg.WorkDir); err != nil { - return fmt.Errorf("failed to chdir into configured dir (%s): %w", runnerCfg.WorkDir, err) + if err := os.Chdir(cfg.Runner.WorkDir); err != nil { + return fmt.Errorf("failed to chdir into configured dir (%s): %w", cfg.Runner.WorkDir, err) } - logs.Debugf("Changed into working directory: %s", runnerCfg.WorkDir) + logs.Debugf("Changed into working directory: %s", cfg.Runner.WorkDir) - // 3. filter environment variables + // 2. prepare env vars to pass to runner - defaultEnvs := []string{"LANG", "PATH", "TZ", "TERM", env.EnvVarIdleTimeout} - allowedEnvs := append(defaultEnvs, runnerCfg.AllowedEnv...) - runnerEnv := env.AllowedOnly(allowedEnvs) - // Static values - runnerEnv = append(runnerEnv, "N8N_RUNNERS_SERVER_ENABLED=true") + runnerEnv := env.PrepareRunnerEnv(cfg) - logs.Debugf("Filtered environment variables") + logs.Debugf("Prepared env vars for runner") for { - // 4. check until task broker is ready + // 3. check until task broker is ready - if err := http.CheckUntilBrokerReady(envCfg.TaskBrokerServerURI); err != nil { + if err := http.CheckUntilBrokerReady(cfg.TaskBrokerURI); err != nil { return fmt.Errorf("encountered error while waiting for broker to be ready: %w", err) } - // 5. fetch grant token for launcher + // 4. fetch grant token for launcher - launcherGrantToken, err := http.FetchGrantToken(envCfg.TaskBrokerServerURI, envCfg.AuthToken) + launcherGrantToken, err := http.FetchGrantToken(cfg.TaskBrokerURI, cfg.AuthToken) if err != nil { return fmt.Errorf("failed to fetch grant token for launcher: %w", err) } logs.Debug("Fetched grant token for launcher") - // 6. connect to main and wait for task offer to be accepted + // 5. connect to main and wait for task offer to be accepted handshakeCfg := ws.HandshakeConfig{ - TaskType: l.RunnerType, - TaskBrokerServerURI: envCfg.TaskBrokerServerURI, + TaskType: cfg.Runner.RunnerType, + TaskBrokerServerURI: cfg.TaskBrokerURI, GrantToken: launcherGrantToken, } @@ -93,9 +73,9 @@ func (l *LaunchCommand) Execute() error { return fmt.Errorf("handshake failed: %w", err) } - // 7. fetch grant token for runner + // 6. fetch grant token for runner - runnerGrantToken, err := http.FetchGrantToken(envCfg.TaskBrokerServerURI, envCfg.AuthToken) + runnerGrantToken, err := http.FetchGrantToken(cfg.TaskBrokerURI, cfg.AuthToken) if err != nil { return fmt.Errorf("failed to fetch grant token for runner: %w", err) } @@ -107,14 +87,14 @@ func (l *LaunchCommand) Execute() error { // 8. launch runner logs.Debug("Task ready for pickup, launching runner...") - logs.Debugf("Command: %s", runnerCfg.Command) - logs.Debugf("Args: %v", runnerCfg.Args) + logs.Debugf("Command: %s", cfg.Runner.Command) + logs.Debugf("Args: %v", cfg.Runner.Args) logs.Debugf("Env vars: %v", env.Keys(runnerEnv)) ctx, cancelHealthMonitor := context.WithCancel(context.Background()) var wg sync.WaitGroup - cmd := exec.CommandContext(ctx, runnerCfg.Command, runnerCfg.Args...) + cmd := exec.CommandContext(ctx, cfg.Runner.Command, cfg.Runner.Args...) cmd.Env = runnerEnv cmd.Stdout, cmd.Stderr = logs.GetRunnerWriters() diff --git a/internal/config/config.go b/internal/config/config.go index cab8aa0..9dbe70e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,65 +1,148 @@ -// Package config provides functions to use the launcher configuration file. package config import ( + "context" "encoding/json" + "errors" "fmt" "os" + "strconv" + "task-runner-launcher/internal/errs" "task-runner-launcher/internal/logs" + + "github.com/sethvargo/go-envconfig" ) var configPath = "/etc/n8n-task-runners.json" -type TaskRunnerConfig struct { - // Type of task runner, currently only "javascript" supported +var cfg Config + +// Config holds the full configuration for the launcher. +type Config struct { + // LogLevel is the log level for the launcher. Default: `info`. + LogLevel string `env:"N8N_LAUNCHER_LOG_LEVEL, default=info"` + + // AuthToken is the auth token sent by the launcher to the task broker in + // exchange for a single-use grant token, later passed to the runner. + AuthToken string `env:"N8N_RUNNERS_AUTH_TOKEN, required"` + + // AutoShutdownTimeout is how long (in seconds) a runner may be idle for + // before automatically shutting down, until later relaunched. + AutoShutdownTimeout string `env:"N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT, default=15"` + + // TaskBrokerURI is the URI of the task broker server. + TaskBrokerURI string `env:"N8N_TASK_BROKER_URI, default=http://127.0.0.1:5679"` + + // Runner is the runner config for the task runner, obtained from: + // `/etc/n8n-task-runners.json`. + Runner *RunnerConfig + + // Sentry is the Sentry config for the launcher, a subset of what is defined in: + // https://docs.sentry.io/platforms/go/configuration/options/ + Sentry *SentryConfig +} + +type SentryConfig struct { + IsEnabled bool + Dsn string `env:"SENTRY_DSN"` // If unset, Sentry will be disabled. + Release string `env:"N8N_VERSION, default=unknown"` + Environment string `env:"ENVIRONMENT, default=unknown"` + DeploymentName string `env:"DEPLOYMENT_NAME, default=unknown"` +} + +type RunnerConfig struct { + // Type of task runner, currently only "javascript" supported. RunnerType string `json:"runner-type"` - // Path to directory containing launcher (Go binary) + // Path to dir containing launcher. WorkDir string `json:"workdir"` - // Command to execute to start task runner + // Command to start runner. Command string `json:"command"` - // Arguments for command to execute, currently path to task runner entrypoint + // Arguments for command, currently path to runner entrypoint. Args []string `json:"args"` - // Env vars allowed to be passed by launcher to task runner + // Env vars allowed to be passed by launcher to runner. AllowedEnv []string `json:"allowed-env"` } -type LauncherConfig struct { - TaskRunners []TaskRunnerConfig `json:"task-runners"` -} +func LoadConfig(runnerType string, lookuper envconfig.Lookuper) (*Config, error) { + ctx := context.Background() -func readConfig() (*LauncherConfig, error) { - data, err := os.ReadFile(configPath) + if err := envconfig.ProcessWith(ctx, &envconfig.Config{ + Target: &cfg, + Lookuper: lookuper, + }); err != nil { + return nil, err + } + + var cfgErrs []error + + // launcher + + if err := validateURL(cfg.TaskBrokerURI, "N8N_TASK_BROKER_URI"); err != nil { + cfgErrs = append(cfgErrs, err) + } + + timeoutInt, err := strconv.Atoi(cfg.AutoShutdownTimeout) if err != nil { - return nil, fmt.Errorf("failed to open config file at %s: %w", configPath, err) + cfgErrs = append(cfgErrs, errs.ErrNonIntegerAutoShutdownTimeout) + } else if timeoutInt < 0 { + cfgErrs = append(cfgErrs, errs.ErrNegativeAutoShutdownTimeout) } - var config LauncherConfig - if err := json.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("failed to parse config file at %s: %w", configPath, err) + // runner + + runnerCfg, err := readFileConfig(runnerType) + if err != nil { + cfgErrs = append(cfgErrs, err) + } + + cfg.Runner = runnerCfg + + // sentry + + if cfg.Sentry.Dsn != "" { + if err := validateURL(cfg.Sentry.Dsn, "SENTRY_DSN"); err != nil { + cfgErrs = append(cfgErrs, err) + } else { + cfg.Sentry.IsEnabled = true + } } - if len(config.TaskRunners) == 0 { - return nil, fmt.Errorf("found no task runner configs inside launcher config") + if len(cfgErrs) > 0 { + return nil, errors.Join(cfgErrs...) } - return &config, nil + return &cfg, nil } -// GetRunnerConfig retrieves and validates the runner configuration for a given runner type. -func GetRunnerConfig(runnerType string) (*TaskRunnerConfig, error) { - fileCfg, err := readConfig() +// readFileConfig reads the config file at `/etc/n8n-task-runners.json` and +// returns the runner config for the requested runner type. +func readFileConfig(requestedRunnerType string) (*RunnerConfig, error) { + data, err := os.ReadFile(configPath) if err != nil { - return nil, fmt.Errorf("error reading config file: %w", err) + return nil, fmt.Errorf("failed to open config file at %s: %w", configPath, err) } - var runnerCfg TaskRunnerConfig + var fileCfg struct { + TaskRunners []RunnerConfig `json:"task-runners"` + } + if err := json.Unmarshal(data, &fileCfg); err != nil { + return nil, fmt.Errorf("failed to parse config file at %s: %w", configPath, err) + } + + taskRunnersNum := len(fileCfg.TaskRunners) + + if taskRunnersNum == 0 { + return nil, errs.ErrMissingRunnerConfig + } + + var runnerCfg RunnerConfig found := false for _, r := range fileCfg.TaskRunners { - if r.RunnerType == runnerType { + if r.RunnerType == requestedRunnerType { runnerCfg = r found = true break @@ -67,10 +150,9 @@ func GetRunnerConfig(runnerType string) (*TaskRunnerConfig, error) { } if !found { - return nil, fmt.Errorf("config file does not contain requested runner type: %s", runnerType) + return nil, fmt.Errorf("config file at %s does not contain requested runner type: %s", configPath, requestedRunnerType) } - taskRunnersNum := len(fileCfg.TaskRunners) if taskRunnersNum == 1 { logs.Debug("Loaded config file with a single runner config") } else { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 647b395..2bab0ee 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -3,144 +3,157 @@ package config import ( "os" "path/filepath" - "reflect" + "strings" "testing" + + "github.com/sethvargo/go-envconfig" ) -func TestGetRunnerConfig(t *testing.T) { - tmpDir, err := os.MkdirTemp("", "task-runner-launcher-config-test") - if err != nil { - t.Fatalf("Failed to create temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) +func TestLoadConfig(t *testing.T) { + testConfigPath := filepath.Join(t.TempDir(), "testconfig.json") - original := configPath - configPath = filepath.Join(tmpDir, "n8n-task-runners.json") - defer func() { configPath = original }() + validConfigContent := `{ + "task-runners": [{ + "runner-type": "javascript", + "workdir": "/test/dir", + "command": "node", + "args": ["/test/start.js"], + "allowed-env": ["PATH", "NODE_ENV"] + }] + }` tests := []struct { - name string - configContent string - runnerType string - expectError bool - expectedConfig *TaskRunnerConfig + name string + configContent string + envVars map[string]string + runnerType string + expectedError bool + errorMsg string }{ { - name: "valid single runner config", - configContent: `{ - "task-runners": [ - { - "runner-type": "javascript", - "workdir": "/usr/local/bin", - "command": "/usr/local/bin/node", - "args": ["/usr/local/lib/node_modules/n8n/node_modules/@n8n/task-runner/dist/start.js"], - "allowed-env": ["PATH", "NODE_OPTIONS"] - } - ] - }`, - runnerType: "javascript", - expectedConfig: &TaskRunnerConfig{ - RunnerType: "javascript", - WorkDir: "/usr/local/bin", - Command: "/usr/local/bin/node", - Args: []string{"/usr/local/lib/node_modules/n8n/node_modules/@n8n/task-runner/dist/start.js"}, - AllowedEnv: []string{"PATH", "NODE_OPTIONS"}, + name: "valid configuration", + configContent: validConfigContent, + envVars: map[string]string{ + "N8N_RUNNERS_AUTH_TOKEN": "test-token", + "N8N_TASK_BROKER_URI": "http://localhost:5679", + "SENTRY_DSN": "https://test@sentry.io/123", }, + runnerType: "javascript", + expectedError: false, }, { - name: "valid multiple runner config", - configContent: `{ - "task-runners": [ - { - "runner-type": "javascript", - "workdir": "/usr/local/bin", - "command": "/usr/local/bin/node", - "args": ["/start.js"], - "allowed-env": ["PATH"] - }, - { - "runner-type": "python", - "workdir": "/usr/local/bin", - "command": "/usr/local/bin/python", - "args": ["/start.py"], - "allowed-env": ["PYTHONPATH"] - } - ] - }`, - runnerType: "python", - expectedConfig: &TaskRunnerConfig{ - RunnerType: "python", - WorkDir: "/usr/local/bin", - Command: "/usr/local/bin/python", - Args: []string{"/start.py"}, - AllowedEnv: []string{"PYTHONPATH"}, + name: "valid configuration", + configContent: validConfigContent, + envVars: map[string]string{ + "N8N_RUNNERS_AUTH_TOKEN": "test-token", + "N8N_TASK_BROKER_URI": "http://127.0.0.1:5679", + "SENTRY_DSN": "https://test@sentry.io/123", }, + runnerType: "javascript", + expectedError: false, }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + + configPath = testConfigPath + + err := os.WriteFile(configPath, []byte(tt.configContent), 0600) + if err != nil { + t.Fatalf("Failed to write test config file: %v", err) + } + + lookuper := envconfig.MapLookuper(tt.envVars) + _, err = LoadConfig(tt.runnerType, lookuper) + + if tt.expectedError && err == nil { + t.Error("Expected error but got nil") + return + } + + if !tt.expectedError && err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + + if tt.expectedError && !strings.Contains(err.Error(), tt.errorMsg) { + t.Errorf("Expected error containing %q, got %q", tt.errorMsg, err.Error()) + } + }) + } +} + +func TestConfigFileErrors(t *testing.T) { + testConfigPath := filepath.Join(t.TempDir(), "testconfig.json") + + tests := []struct { + name string + configContent string + expectedError string + envVars map[string]string + }{ { - name: "runner type not found", - configContent: `{ - "task-runners": [ - { - "runner-type": "javascript", - "workdir": "/usr/local/bin", - "command": "/usr/local/bin/node", - "args": ["/start.js"], - "allowed-env": ["PATH"] - } - ] - }`, - runnerType: "python", - expectError: true, + name: "invalid JSON in config file", + configContent: "invalid json", + expectedError: "failed to parse config file", + envVars: map[string]string{ + "N8N_RUNNERS_AUTH_TOKEN": "test-token", + "N8N_TASK_BROKER_URI": "http://localhost:5679", + }, }, { name: "empty task runners array", configContent: `{ "task-runners": [] }`, - runnerType: "javascript", - expectError: true, - }, - { - name: "invalid json", - configContent: `{"task-runners": [{"runner-type": "javascript"`, - runnerType: "javascript", - expectError: true, + expectedError: "found no task runner configs", + envVars: map[string]string{ + "N8N_RUNNERS_AUTH_TOKEN": "test-token", + "N8N_TASK_BROKER_URI": "http://localhost:5679", + }, }, { - name: "missing config file", - configContent: "", - runnerType: "javascript", - expectError: true, + name: "runner type not found", + configContent: `{ + "task-runners": [{ + "runner-type": "python", + "workdir": "/test/dir", + "command": "python", + "args": ["/test/start.py"], + "allowed-env": ["PATH", "PYTHONPATH"] + }] + }`, + expectedError: "does not contain requested runner type: javascript", + envVars: map[string]string{ + "N8N_RUNNERS_AUTH_TOKEN": "test-token", + "N8N_TASK_BROKER_URI": "http://localhost:5679", + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + configPath = testConfigPath + if tt.configContent != "" { err := os.WriteFile(configPath, []byte(tt.configContent), 0600) if err != nil { t.Fatalf("Failed to write test config file: %v", err) } - } else { - os.Remove(configPath) } - config, err := GetRunnerConfig(tt.runnerType) + lookuper := envconfig.MapLookuper(tt.envVars) + _, err := LoadConfig("javascript", lookuper) - if tt.expectError { - if err == nil { - t.Error("Expected error but got nil") - } - return - } - - if err != nil { - t.Errorf("Unexpected error: %v", err) + if err == nil { + t.Error("Expected error but got nil") return } - if !reflect.DeepEqual(config, tt.expectedConfig) { - t.Errorf("Config mismatch\nGot: %+v\nWant: %+v", config, tt.expectedConfig) + if !strings.Contains(err.Error(), tt.expectedError) { + t.Errorf("Expected error containing %q, got %q", tt.expectedError, err.Error()) } }) } diff --git a/internal/config/validate_url.go b/internal/config/validate_url.go new file mode 100644 index 0000000..f56cf44 --- /dev/null +++ b/internal/config/validate_url.go @@ -0,0 +1,24 @@ +package config + +import ( + "fmt" + "net/url" +) + +func validateURL(urlStr string, urlName string) error { + if urlStr == "" { + return fmt.Errorf("%s must be a valid URL but is empty", urlName) + } + + u, err := url.Parse(urlStr) + + if err != nil { + return fmt.Errorf("%s must be a valid URL: %w", urlName, err) + } + + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("%s must use http:// or https:// scheme", urlName) + } + + return nil +} diff --git a/internal/config/validate_url_test.go b/internal/config/validate_url_test.go new file mode 100644 index 0000000..6a5d0f4 --- /dev/null +++ b/internal/config/validate_url_test.go @@ -0,0 +1,70 @@ +package config + +import ( + "strings" + "testing" +) + +func TestValidateURL(t *testing.T) { + tests := []struct { + name string + url string + fieldName string + expectError bool + errorMsg string + }{ + { + name: "valid http URL", + url: "http://localhost:5679", + fieldName: "test_field", + expectError: false, + }, + { + name: "valid https URL", + url: "https://example.com", + fieldName: "test_field", + expectError: false, + }, + { + name: "scheme-less localhost", + url: "localhost:5679", + fieldName: "test_field", + expectError: true, + errorMsg: "must use http:// or https:// scheme", + }, + { + name: "invalid URL", + url: "http:// invalid url", + fieldName: "test_field", + expectError: true, + errorMsg: "must be a valid URL", + }, + { + name: "empty URL", + url: "", + fieldName: "test_field", + expectError: true, + errorMsg: "must be a valid URL but is empty", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateURL(tt.url, tt.fieldName) + + if tt.expectError && err == nil { + t.Error("Expected error but got nil") + return + } + + if !tt.expectError && err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + + if tt.expectError && !strings.Contains(err.Error(), tt.errorMsg) { + t.Errorf("Expected error containing %q, got %q", tt.errorMsg, err.Error()) + } + }) + } +} diff --git a/internal/env/env.go b/internal/env/env.go index 8236d49..a8ad19c 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -1,60 +1,32 @@ package env import ( - "errors" "fmt" - "net/url" "os" "sort" - "strconv" "strings" + "task-runner-launcher/internal/config" ) const ( - // ------------------------ - // auth - // ------------------------ - - // EnvVarAuthToken is the env var for the auth token sent by the launcher to - // the main instance in exchange for a single-use grant token. - // nolint:gosec // G101: False positive - EnvVarAuthToken = "N8N_RUNNERS_AUTH_TOKEN" - // EnvVarGrantToken is the env var for the single-use grant token returned by // the main instance to the launcher in exchange for the auth token. // nolint:gosec // G101: False positive EnvVarGrantToken = "N8N_RUNNERS_GRANT_TOKEN" - // ------------------------ - // task broker - // ------------------------ - - // EnvVarTaskBrokerServerURI is the env var for the URI of the - // task broker server, typically at http://127.0.0.1:5679. Typically - // the broker server runs inside an n8n instance (main or worker). - EnvVarTaskBrokerServerURI = "N8N_TASK_BROKER_URI" - - // ------------------------ - // runner - // ------------------------ - - // EnvVarIdleTimeout is the env var for how long (in seconds) a runner may be - // idle for before exit. - EnvVarIdleTimeout = "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT" + // EnvVarAutoShutdownTimeout is the env var for how long (in seconds) a runner + // may be idle for before exit. + EnvVarAutoShutdownTimeout = "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT" ) const ( - defaultIdleTimeoutValue = "15" // seconds - DefaultMainServerURI = "http://127.0.0.1:5678" - DefaultTaskBrokerServerURI = "http://127.0.0.1:5679" - // URI of the runner. Used for monitoring the runner's health RunnerServerURI = "http://127.0.0.1:5681" ) -// AllowedOnly filters the current environment down to only those -// environment variables in the allow list. -func AllowedOnly(allowed []string) []string { +// allowedOnly filters the current environment down to only those +// environment variables in the allowlist. +func allowedOnly(allowlist []string) []string { var filtered []string for _, env := range os.Environ() { @@ -64,7 +36,7 @@ func AllowedOnly(allowed []string) []string { } key := parts[0] - for _, allowedKey := range allowed { + for _, allowedKey := range allowlist { if key == allowedKey { filtered = append(filtered, env) break @@ -100,62 +72,14 @@ func Clear(envVars []string, envVarName string) []string { return result } -func ValidateURL(urlStr string, fieldName string) error { - u, err := url.Parse(urlStr) - - if err != nil { - return fmt.Errorf("%s must be a valid URL: %w", fieldName, err) - } - - if u.Scheme == "localhost" { - // edge case: `url.Parse` parses scheme in `localhost:5678` to be `localhost` - return fmt.Errorf("%s must include a scheme, e.g. http://", fieldName) - } - - return nil -} +// PrepareRunnerEnv prepares the environment variables to pass to the runner. +func PrepareRunnerEnv(cfg *config.Config) []string { + defaultEnvs := []string{"LANG", "PATH", "TZ", "TERM"} + allowedEnvs := append(defaultEnvs, cfg.Runner.AllowedEnv...) -// EnvConfig holds validated environment variable values. -// nolint:revive // exported -type EnvConfig struct { - AuthToken string - TaskBrokerServerURI string -} - -// FromEnv retrieves vars from the environment, validates their values, and -// returns a Config holding the validated values, or a slice of errors. -func FromEnv() (*EnvConfig, error) { - var errs []error - - authToken := os.Getenv(EnvVarAuthToken) - taskBrokerServerURI := os.Getenv(EnvVarTaskBrokerServerURI) - idleTimeout := os.Getenv(EnvVarIdleTimeout) - - if authToken == "" { - errs = append(errs, fmt.Errorf("%s is required", EnvVarAuthToken)) - } - - if taskBrokerServerURI == "" { - taskBrokerServerURI = DefaultTaskBrokerServerURI - } else if err := ValidateURL(taskBrokerServerURI, EnvVarTaskBrokerServerURI); err != nil { - errs = append(errs, err) - } - - if idleTimeout == "" { - os.Setenv(EnvVarIdleTimeout, defaultIdleTimeoutValue) - } else { - idleTimeoutInt, err := strconv.Atoi(idleTimeout) - if err != nil || idleTimeoutInt < 0 { - errs = append(errs, fmt.Errorf("%s must be a non-negative integer", EnvVarIdleTimeout)) - } - } - - if len(errs) > 0 { - return nil, errors.Join(errs...) - } + runnerEnv := allowedOnly(allowedEnvs) + runnerEnv = append(runnerEnv, "N8N_RUNNERS_SERVER_ENABLED=true") + runnerEnv = append(runnerEnv, fmt.Sprintf("%s=%s", EnvVarAutoShutdownTimeout, cfg.AutoShutdownTimeout)) - return &EnvConfig{ - AuthToken: authToken, - TaskBrokerServerURI: taskBrokerServerURI, - }, nil + return runnerEnv } diff --git a/internal/env/env_test.go b/internal/env/env_test.go index 29e0359..66fab2f 100644 --- a/internal/env/env_test.go +++ b/internal/env/env_test.go @@ -3,6 +3,8 @@ package env import ( "os" "reflect" + "sort" + "task-runner-launcher/internal/config" "testing" ) @@ -54,7 +56,7 @@ func TestAllowedOnly(t *testing.T) { os.Setenv(k, v) } - got := AllowedOnly(tt.allowed) + got := allowedOnly(tt.allowed) if tt.expected == nil && len(got) == 0 { return @@ -143,114 +145,103 @@ func TestClear(t *testing.T) { } } -func TestFromEnv(t *testing.T) { +func TestPrepareRunnerEnv(t *testing.T) { tests := []struct { - name string - envVars map[string]string - expectError bool - expected *EnvConfig + name string + config *config.Config + envSetup map[string]string + expected []string + setupFunc func() + cleanFunc func() }{ { - name: "valid custom configuration", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarTaskBrokerServerURI: "http://localhost:9001", - EnvVarIdleTimeout: "30", + name: "includes default and allowed env vars", + config: &config.Config{ + AutoShutdownTimeout: "15", + Runner: &config.RunnerConfig{ + AllowedEnv: []string{"CUSTOM_VAR1", "CUSTOM_VAR2"}, + }, }, - expected: &EnvConfig{ - AuthToken: "token123", - TaskBrokerServerURI: "http://localhost:9001", + envSetup: map[string]string{ + "PATH": "/usr/bin", + "LANG": "en_US.UTF-8", + "TZ": "UTC", + "TERM": "xterm", + "CUSTOM_VAR1": "value1", + "CUSTOM_VAR2": "value2", + "RESTRICTED": "should-not-appear", }, - }, - { - name: "missing auth token", - envVars: map[string]string{ - EnvVarTaskBrokerServerURI: "http://localhost:5679", + expected: []string{ + "CUSTOM_VAR1=value1", + "CUSTOM_VAR2=value2", + "LANG=en_US.UTF-8", + "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT=15", + "N8N_RUNNERS_SERVER_ENABLED=true", + "PATH=/usr/bin", + "TERM=xterm", + "TZ=UTC", }, - expectError: true, }, { - name: "invalid task broker server URI", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarTaskBrokerServerURI: "http://\\invalid:5679", + name: "handles empty allowed env list", + config: &config.Config{ + AutoShutdownTimeout: "15", + Runner: &config.RunnerConfig{ + AllowedEnv: []string{}, + }, }, - expectError: true, - }, - { - name: "missing task broker server URI", - envVars: map[string]string{ - EnvVarAuthToken: "token123", + envSetup: map[string]string{ + "PATH": "/usr/bin", + "LANG": "en_US.UTF-8", + "RESTRICTED": "should-not-appear", }, - expected: &EnvConfig{ - AuthToken: "token123", - TaskBrokerServerURI: DefaultTaskBrokerServerURI, + expected: []string{ + "LANG=en_US.UTF-8", + "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT=15", + "N8N_RUNNERS_SERVER_ENABLED=true", + "PATH=/usr/bin", }, }, { - name: "missing scheme in 127.0.0.1 URI", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarTaskBrokerServerURI: "127.0.0.1:5679", + name: "handles custom auto-shutdown timeout", + config: &config.Config{ + AutoShutdownTimeout: "30", + Runner: &config.RunnerConfig{ + AllowedEnv: []string{}, + }, }, - expectError: true, - }, - { - name: "missing scheme in localhost URI", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarTaskBrokerServerURI: "localhost:5679", + envSetup: map[string]string{ + "PATH": "/usr/bin", + "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT": "30", }, - expectError: true, - }, - { - name: "invalid idle timeout", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarTaskBrokerServerURI: "http://localhost:5679", - EnvVarIdleTimeout: "invalid", - }, - expectError: true, - }, - { - name: "negative idle timeout", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarTaskBrokerServerURI: "http://localhost:5679", - EnvVarIdleTimeout: "-1", + expected: []string{ + "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT=30", + "N8N_RUNNERS_SERVER_ENABLED=true", + "PATH=/usr/bin", }, - expectError: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { os.Clearenv() - for k, v := range tt.envVars { + for k, v := range tt.envSetup { os.Setenv(k, v) } - envCfg, err := FromEnv() - - if tt.expectError { - if err == nil { - t.Error("FromEnv() expected error, got nil") - } - return + if tt.setupFunc != nil { + tt.setupFunc() } - if err != nil { - t.Errorf("FromEnv() unexpected error: %v", err) - return - } + got := PrepareRunnerEnv(tt.config) + sort.Strings(got) - if envCfg == nil { - t.Error("FromEnv() returned nil config") - return + if !reflect.DeepEqual(got, tt.expected) { + t.Errorf("PrepareRunnerEnv() =\ngot: %v\nwant: %v", got, tt.expected) } - if !reflect.DeepEqual(envCfg, tt.expected) { - t.Errorf("FromEnv() = %+v, want %+v", envCfg, tt.expected) + if tt.cleanFunc != nil { + tt.cleanFunc() } }) } diff --git a/internal/errorreporting/sentry.go b/internal/errorreporting/sentry.go index b3b5222..f226a8b 100644 --- a/internal/errorreporting/sentry.go +++ b/internal/errorreporting/sentry.go @@ -2,87 +2,45 @@ package errorreporting import ( "os" - "task-runner-launcher/internal/env" + "task-runner-launcher/internal/config" "task-runner-launcher/internal/logs" "time" "github.com/getsentry/sentry-go" ) -// Configuration options for Sentry. A subset of what is defined in -// https://docs.sentry.io/platforms/go/configuration/options/ -type Config struct { - IsEnabled bool - Dsn string - Release string - Environment string - DeploymentName string -} +var ( + sentryInit = sentry.Init + sentryFlush = sentry.Flush + osExit = os.Exit +) -// Init initializes the Sentry client using configuration from the environment. +// Init initializes the Sentry client using given configuration. // If SENTRY_DSN env var is not set, Sentry will be disabled. -func Init() { - config := ConfigFromEnv() - if !config.IsEnabled { +func Init(sentryCfg *config.SentryConfig) { + if !sentryCfg.IsEnabled { return } logs.Debug("Initializing Sentry") - err := sentry.Init(sentry.ClientOptions{ - Dsn: config.Dsn, - ServerName: config.DeploymentName, - Release: config.Release, - Environment: config.Environment, + err := sentryInit(sentry.ClientOptions{ + Dsn: sentryCfg.Dsn, + ServerName: sentryCfg.DeploymentName, + Release: sentryCfg.Release, + Environment: sentryCfg.Environment, Debug: false, EnableTracing: false, }) if err != nil { logs.Errorf("Sentry failed to initialize: %v", err) - os.Exit(1) + osExit(1) } -} -func Close() { - sentry.Flush(2 * time.Second) + logs.Debug("Initialized Sentry") } -func ConfigFromEnv() Config { - config := Config{ - IsEnabled: true, - } - - dsn := os.Getenv("SENTRY_DSN") - if dsn == "" { - config.IsEnabled = false - return config - } - - err := env.ValidateURL(dsn, "SENTRY_DSN") - if err != nil { - logs.Errorf("Invalid Sentry DSN: %v", err) - config.IsEnabled = false - return config - } - - config.Dsn = dsn - config.DeploymentName = os.Getenv("DEPLOYMENT_NAME") - config.Environment = os.Getenv("ENVIRONMENT") - config.Release = os.Getenv("N8N_VERSION") - - if config.DeploymentName == "" { - logs.Warn("DEPLOYMENT_NAME is not set. Using 'task-runner-launcher'.") - config.DeploymentName = "task-runner-launcher" - } - if config.Environment == "" { - logs.Warn("ENVIRONMENT is not set. Using 'unknown'.") - config.Environment = "unknown" - } - if config.Release == "" { - logs.Warn("N8N_VERSION is not set. Using 'unknown'.") - config.Release = "unknown" - } - - return config +func Close() { + sentryFlush(2 * time.Second) } diff --git a/internal/errorreporting/sentry_test.go b/internal/errorreporting/sentry_test.go index cedb813..c34d30a 100644 --- a/internal/errorreporting/sentry_test.go +++ b/internal/errorreporting/sentry_test.go @@ -1,76 +1,94 @@ package errorreporting import ( - "os" + "errors" + "task-runner-launcher/internal/config" "testing" + "time" + + "github.com/getsentry/sentry-go" + "github.com/stretchr/testify/assert" ) -func TestConfigFromEnv(t *testing.T) { +func TestInit(t *testing.T) { tests := []struct { name string - envVars map[string]string - expectedConfig Config + config *config.SentryConfig + expectInit bool + expectPanic bool + mockSentryInit func(options sentry.ClientOptions) error }{ { - name: "Sentry disabled when DSN is empty", - envVars: map[string]string{ - "SENTRY_DSN": "", - }, - expectedConfig: Config{IsEnabled: false}, - }, - { - name: "Sentry disabled when DSN is invalid", - envVars: map[string]string{ - "SENTRY_DSN": "http://\\invalid", + name: "should not initialize when disabled", + config: &config.SentryConfig{ + IsEnabled: false, }, - expectedConfig: Config{IsEnabled: false}, + expectInit: false, }, { - name: "Sentry enabled with valid config", - envVars: map[string]string{ - "SENTRY_DSN": "http://example.com", - "DEPLOYMENT_NAME": "test-deployment", - "ENVIRONMENT": "test-env", - "N8N_VERSION": "1.0.0", - }, - expectedConfig: Config{ + name: "should initialize with valid config", + config: &config.SentryConfig{ IsEnabled: true, - Dsn: "http://example.com", + Dsn: "https://test@sentry.io/123", DeploymentName: "test-deployment", - Environment: "test-env", Release: "1.0.0", + Environment: "test-environment", + }, + expectInit: true, + mockSentryInit: func(options sentry.ClientOptions) error { + assert.Equal(t, "https://test@sentry.io/123", options.Dsn) + assert.Equal(t, "test-deployment", options.ServerName) + assert.Equal(t, "1.0.0", options.Release) + assert.Equal(t, "test-environment", options.Environment) + assert.False(t, options.Debug) + assert.False(t, options.EnableTracing) + return nil }, }, { - name: "Sentry enabled with missing config", - envVars: map[string]string{ - "SENTRY_DSN": "http://example.com", + name: "should handle initialization error", + config: &config.SentryConfig{ + IsEnabled: true, + Dsn: "invalid-dsn", }, - expectedConfig: Config{ - IsEnabled: true, - Dsn: "http://example.com", - DeploymentName: "task-runner-launcher", - Environment: "unknown", - Release: "unknown", + expectInit: true, + expectPanic: true, + mockSentryInit: func(_ sentry.ClientOptions) error { + return errors.New("oh no") }, }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - for key, value := range test.envVars { - os.Setenv(key, value) - } - - config := ConfigFromEnv() - - if config != test.expectedConfig { - t.Errorf("got %+v, want %+v", config, test.expectedConfig) - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.expectPanic { + originalOsExit := osExit + defer func() { osExit = originalOsExit }() + exitCalled := false + osExit = func(code int) { + exitCalled = true + assert.Equal(t, 1, code) + } - for key := range test.envVars { - os.Unsetenv(key) + Init(tt.config) + assert.True(t, exitCalled, "expected os.Exit to be called") + } else { + Init(tt.config) } }) } } + +func TestClose(t *testing.T) { + flushCalled := false + expectedDuration := 2 * time.Second + + sentryFlush = func(timeout time.Duration) bool { + flushCalled = true + assert.Equal(t, expectedDuration, timeout) + return true + } + + Close() + assert.True(t, flushCalled, "expected sentry.Flush to be called") +} diff --git a/internal/errs/errs.go b/internal/errs/errs.go index 8ce074f..6e97ff4 100644 --- a/internal/errs/errs.go +++ b/internal/errs/errs.go @@ -9,4 +9,12 @@ var ( // ErrWsMsgTooLarge is returned when the websocket message is too large for // the launcher's websocket buffer. ErrWsMsgTooLarge = errors.New("websocket message too large for buffer - please increase buffer size") + + ErrNonIntegerAutoShutdownTimeout = errors.New("invalid auto-shutdown timeout - N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT must be a valid integer") + + // ErrNegativeAutoShutdownTimeout is returned when the auto shutdown timeout is a negative integer. + ErrNegativeAutoShutdownTimeout = errors.New("negative auto-shutdown timeout - N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT must be >= 0") + + // ErrMissingRunnerConfig is returned when the config file does not contain any runner configs. + ErrMissingRunnerConfig = errors.New("found no task runner configs at /etc/n8n-task-runners.json") )