Skip to content

Commit

Permalink
Merge branch 'main' into add-coverage-for-runner-health-monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Nov 28, 2024
2 parents 75c9f86 + c830f7a commit 8d3905f
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 459 deletions.
28 changes: 18 additions & 10 deletions cmd/launcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,37 @@ import (
"os"

"task-runner-launcher/internal/commands"
"task-runner-launcher/internal/config"
"task-runner-launcher/internal/errorreporting"
"task-runner-launcher/internal/http"
"task-runner-launcher/internal/logs"

"github.com/sethvargo/go-envconfig"
)

func main() {
logLevel := os.Getenv("N8N_LAUNCHER_LOG_LEVEL")

logs.SetLevel(logLevel) // default info

flag.Usage = func() {
logs.Infof("Usage: %s [runner-type]", os.Args[0])
fmt.Printf("Usage: %s [runner-type]\n", os.Args[0])
flag.PrintDefaults()
}

if len(os.Args) < 2 {
logs.Error("Missing runner-type argument")
os.Stderr.WriteString("Missing runner-type argument")
flag.Usage()
os.Exit(1)
}

errorreporting.Init()
runnerType := os.Args[1]

cfg, err := config.LoadConfig(runnerType, envconfig.OsLookuper())
if err != nil {
logs.Errorf("Failed to load config: %v", err)
os.Exit(1)
}

logs.SetLevel(cfg.LogLevel)

errorreporting.Init(cfg.Sentry)
defer errorreporting.Close()

srv := http.NewHealthCheckServer()
Expand All @@ -46,10 +55,9 @@ func main() {
}
}()

runnerType := os.Args[1]
cmd := &commands.LaunchCommand{RunnerType: runnerType}
cmd := &commands.LaunchCommand{}

if err := cmd.Execute(); err != nil {
if err := cmd.Execute(cfg); err != nil {
logs.Errorf("Failed to execute `launch` command: %s", err)
}
}
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ go 1.23.3
require (
github.com/getsentry/sentry-go v0.29.1
github.com/gorilla/websocket v1.5.3
github.com/sethvargo/go-envconfig v1.1.0
github.com/stretchr/testify v1.8.2
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/getsentry/sentry-go v0.29.1 h1:DyZuChN8Hz3ARxGVV8ePaNXh1dQ7d76AiB117xcREwA=
github.com/getsentry/sentry-go v0.29.1/go.mod h1:x3AtIzN01d6SiWkderzaH28Tm0lgkafpJ5Bm3li39O0=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
Expand All @@ -14,11 +15,21 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sethvargo/go-envconfig v1.1.0 h1:cWZiJxeTm7AlCvzGXrEXaSTCNgip5oJepekh/BOQuog=
github.com/sethvargo/go-envconfig v1.1.0/go.mod h1:JLd0KFWQYzyENqnEPWWZ49i4vzZo/6nRidxI8YvGiHw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
64 changes: 22 additions & 42 deletions internal/commands/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,66 +20,46 @@ type Command interface {
Execute() error
}

type LaunchCommand struct {
RunnerType string
}

func (l *LaunchCommand) Execute() error {
logs.Info("Starting to execute `launch` command")
type LaunchCommand struct{}

// 0. validate env vars

envCfg, err := env.FromEnv()
if err != nil {
return fmt.Errorf("env vars failed validation: %w", err)
}

// 1. read runner config

runnerCfg, err := config.GetRunnerConfig(l.RunnerType)
if err != nil {
return fmt.Errorf("failed to get runner config: %w", err)
}
func (l *LaunchCommand) Execute(cfg *config.Config) error {
logs.Info("Starting launcher...")

// 2. change into working directory
// 1. change into working directory

if err := os.Chdir(runnerCfg.WorkDir); err != nil {
return fmt.Errorf("failed to chdir into configured dir (%s): %w", runnerCfg.WorkDir, err)
if err := os.Chdir(cfg.Runner.WorkDir); err != nil {
return fmt.Errorf("failed to chdir into configured dir (%s): %w", cfg.Runner.WorkDir, err)
}

logs.Debugf("Changed into working directory: %s", runnerCfg.WorkDir)
logs.Debugf("Changed into working directory: %s", cfg.Runner.WorkDir)

// 3. filter environment variables
// 2. prepare env vars to pass to runner

defaultEnvs := []string{"LANG", "PATH", "TZ", "TERM", env.EnvVarIdleTimeout}
allowedEnvs := append(defaultEnvs, runnerCfg.AllowedEnv...)
runnerEnv := env.AllowedOnly(allowedEnvs)
// Static values
runnerEnv = append(runnerEnv, "N8N_RUNNERS_SERVER_ENABLED=true")
runnerEnv := env.PrepareRunnerEnv(cfg)

logs.Debugf("Filtered environment variables")
logs.Debugf("Prepared env vars for runner")

for {
// 4. check until task broker is ready
// 3. check until task broker is ready

if err := http.CheckUntilBrokerReady(envCfg.TaskBrokerServerURI); err != nil {
if err := http.CheckUntilBrokerReady(cfg.TaskBrokerURI); err != nil {
return fmt.Errorf("encountered error while waiting for broker to be ready: %w", err)
}

// 5. fetch grant token for launcher
// 4. fetch grant token for launcher

launcherGrantToken, err := http.FetchGrantToken(envCfg.TaskBrokerServerURI, envCfg.AuthToken)
launcherGrantToken, err := http.FetchGrantToken(cfg.TaskBrokerURI, cfg.AuthToken)
if err != nil {
return fmt.Errorf("failed to fetch grant token for launcher: %w", err)
}

logs.Debug("Fetched grant token for launcher")

// 6. connect to main and wait for task offer to be accepted
// 5. connect to main and wait for task offer to be accepted

handshakeCfg := ws.HandshakeConfig{
TaskType: l.RunnerType,
TaskBrokerServerURI: envCfg.TaskBrokerServerURI,
TaskType: cfg.Runner.RunnerType,
TaskBrokerServerURI: cfg.TaskBrokerURI,
GrantToken: launcherGrantToken,
}

Expand All @@ -93,9 +73,9 @@ func (l *LaunchCommand) Execute() error {
return fmt.Errorf("handshake failed: %w", err)
}

// 7. fetch grant token for runner
// 6. fetch grant token for runner

runnerGrantToken, err := http.FetchGrantToken(envCfg.TaskBrokerServerURI, envCfg.AuthToken)
runnerGrantToken, err := http.FetchGrantToken(cfg.TaskBrokerURI, cfg.AuthToken)
if err != nil {
return fmt.Errorf("failed to fetch grant token for runner: %w", err)
}
Expand All @@ -107,14 +87,14 @@ func (l *LaunchCommand) Execute() error {
// 8. launch runner

logs.Debug("Task ready for pickup, launching runner...")
logs.Debugf("Command: %s", runnerCfg.Command)
logs.Debugf("Args: %v", runnerCfg.Args)
logs.Debugf("Command: %s", cfg.Runner.Command)
logs.Debugf("Args: %v", cfg.Runner.Args)
logs.Debugf("Env vars: %v", env.Keys(runnerEnv))

ctx, cancelHealthMonitor := context.WithCancel(context.Background())
var wg sync.WaitGroup

cmd := exec.CommandContext(ctx, runnerCfg.Command, runnerCfg.Args...)
cmd := exec.CommandContext(ctx, cfg.Runner.Command, cfg.Runner.Args...)
cmd.Env = runnerEnv
cmd.Stdout, cmd.Stderr = logs.GetRunnerWriters()

Expand Down
Loading

0 comments on commit 8d3905f

Please sign in to comment.