diff --git a/cmd/launcher/main.go b/cmd/launcher/main.go index a1acd17..ef2df7d 100644 --- a/cmd/launcher/main.go +++ b/cmd/launcher/main.go @@ -40,19 +40,18 @@ func main() { errorreporting.Init(cfg.Sentry) defer errorreporting.Close() - srv := http.NewHealthCheckServer() + srv := http.NewHealthCheckServer(cfg.HealthCheckServerPort) go func() { - logs.Infof("Starting health check server at port %d", http.GetPort()) - if err := srv.ListenAndServe(); err != nil { errMsg := "Health check server failed to start" if opErr, ok := err.(*net.OpError); ok && opErr.Op == "listen" { - errMsg = fmt.Sprintf("%s: Port %d is already in use", errMsg, http.GetPort()) + errMsg = fmt.Sprintf("%s: Port %s is already in use", errMsg, srv.Addr) } else { errMsg = fmt.Sprintf("%s: %s", errMsg, err) } logs.Error(errMsg) } + logs.Infof("Started launcher's health check server at port %d", srv.Addr) }() cmd := &commands.LaunchCommand{} diff --git a/internal/config/config.go b/internal/config/config.go index 9dbe70e..241f4e6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,11 @@ var configPath = "/etc/n8n-task-runners.json" var cfg Config +const ( + // EnvVarHealthCheckPort is the env var for the port for the launcher's health check server. + EnvVarHealthCheckPort = "N8N_LAUNCHER_HEALTCHECK_PORT" +) + // Config holds the full configuration for the launcher. type Config struct { // LogLevel is the log level for the launcher. Default: `info`. @@ -33,6 +38,9 @@ type Config struct { // TaskBrokerURI is the URI of the task broker server. TaskBrokerURI string `env:"N8N_TASK_BROKER_URI, default=http://127.0.0.1:5679"` + // HealthCheckServerPort is the port for the launcher's health check server. + HealthCheckServerPort string `env:"N8N_LAUNCHER_HEALTCHECK_PORT, default=5680"` + // Runner is the runner config for the task runner, obtained from: // `/etc/n8n-task-runners.json`. Runner *RunnerConfig @@ -92,6 +100,10 @@ func LoadConfig(runnerType string, lookuper envconfig.Lookuper) (*Config, error) cfgErrs = append(cfgErrs, errs.ErrNegativeAutoShutdownTimeout) } + if port, err := strconv.Atoi(cfg.HealthCheckServerPort); err != nil || port <= 0 || port >= 65536 { + cfgErrs = append(cfgErrs, fmt.Errorf("%s must be a valid port number", EnvVarHealthCheckPort)) + } + // runner runnerCfg, err := readFileConfig(runnerType) diff --git a/internal/http/healthcheck_server.go b/internal/http/healthcheck_server.go index 979b86a..6d9c52b 100644 --- a/internal/http/healthcheck_server.go +++ b/internal/http/healthcheck_server.go @@ -4,26 +4,22 @@ import ( "encoding/json" "fmt" "net/http" - "os" - "strconv" "task-runner-launcher/internal/logs" "time" ) const ( - defaultPort = 5680 - portEnvVar = "N8N_LAUNCHER_HEALTCHECK_PORT" healthCheckPath = "/healthz" readTimeout = 1 * time.Second writeTimeout = 1 * time.Second ) -func NewHealthCheckServer() *http.Server { +func NewHealthCheckServer(port string) *http.Server { mux := http.NewServeMux() mux.HandleFunc(healthCheckPath, handleHealthCheck) return &http.Server{ - Addr: fmt.Sprintf(":%d", GetPort()), + Addr: fmt.Sprintf(":%s", port), Handler: mux, ReadTimeout: readTimeout, WriteTimeout: writeTimeout, @@ -48,14 +44,3 @@ func handleHealthCheck(w http.ResponseWriter, r *http.Request) { return } } - -func GetPort() int { - if customPortStr := os.Getenv(portEnvVar); customPortStr != "" { - if customPort, err := strconv.Atoi(customPortStr); err == nil && customPort > 0 && customPort < 65536 { - return customPort - } - logs.Warnf("%s sets an invalid port, falling back to default port %d", portEnvVar, defaultPort) - } - - return defaultPort -} diff --git a/internal/http/healthcheck_server_test.go b/internal/http/healthcheck_server_test.go index a99f556..b41450d 100644 --- a/internal/http/healthcheck_server_test.go +++ b/internal/http/healthcheck_server_test.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "os" "testing" ) @@ -94,7 +93,7 @@ func (w *failingWriter) WriteHeader(statusCode int) { } func TestNewHealthCheckServer(t *testing.T) { - server := NewHealthCheckServer() + server := NewHealthCheckServer("5680") if server == nil { t.Fatal("NewHealthCheckServer() returned nil") @@ -113,54 +112,3 @@ func TestNewHealthCheckServer(t *testing.T) { t.Errorf("NewHealthCheckServer() writeTimeout = %v, want %v", server.WriteTimeout, writeTimeout) } } - -func TestGetPort(t *testing.T) { - tests := []struct { - name string - envPort string - expectedPort int - }{ - { - name: "returns default port when env var is not set", - envPort: "", - expectedPort: defaultPort, - }, - { - name: "returns custom port when valid env var is set", - envPort: "8080", - expectedPort: 8080, - }, - { - name: "returns default port when env var is negative", - envPort: "-1", - expectedPort: defaultPort, - }, - { - name: "returns default port when env var is zero", - envPort: "0", - expectedPort: defaultPort, - }, - { - name: "returns default port when env var is too large", - envPort: "65536", - expectedPort: defaultPort, - }, - { - name: "returns default port when env var is not a number", - envPort: "invalid", - expectedPort: defaultPort, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - os.Setenv(portEnvVar, tt.envPort) - - if got := GetPort(); got != tt.expectedPort { - t.Errorf("GetPort() = %v, want %v", got, tt.expectedPort) - } - - os.Unsetenv(portEnvVar) - }) - } -}