Skip to content

Commit

Permalink
Refactor to use new config style
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Nov 28, 2024
1 parent c873399 commit 4cc37c8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 74 deletions.
7 changes: 3 additions & 4 deletions cmd/launcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@ func main() {
errorreporting.Init(cfg.Sentry)
defer errorreporting.Close()

srv := http.NewHealthCheckServer()
srv := http.NewHealthCheckServer(cfg.HealthCheckServerPort)
go func() {
logs.Infof("Starting health check server at port %d", http.GetPort())

if err := srv.ListenAndServe(); err != nil {
errMsg := "Health check server failed to start"
if opErr, ok := err.(*net.OpError); ok && opErr.Op == "listen" {
errMsg = fmt.Sprintf("%s: Port %d is already in use", errMsg, http.GetPort())
errMsg = fmt.Sprintf("%s: Port %s is already in use", errMsg, srv.Addr)
} else {
errMsg = fmt.Sprintf("%s: %s", errMsg, err)
}
logs.Error(errMsg)
}
logs.Infof("Started launcher's health check server at port %d", srv.Addr)
}()

cmd := &commands.LaunchCommand{}
Expand Down
12 changes: 12 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var configPath = "/etc/n8n-task-runners.json"

var cfg Config

const (
// EnvVarHealthCheckPort is the env var for the port for the launcher's health check server.
EnvVarHealthCheckPort = "N8N_LAUNCHER_HEALTCHECK_PORT"
)

// Config holds the full configuration for the launcher.
type Config struct {
// LogLevel is the log level for the launcher. Default: `info`.
Expand All @@ -33,6 +38,9 @@ type Config struct {
// TaskBrokerURI is the URI of the task broker server.
TaskBrokerURI string `env:"N8N_TASK_BROKER_URI, default=http://127.0.0.1:5679"`

// HealthCheckServerPort is the port for the launcher's health check server.
HealthCheckServerPort string `env:"N8N_LAUNCHER_HEALTCHECK_PORT, default=5680"`

// Runner is the runner config for the task runner, obtained from:
// `/etc/n8n-task-runners.json`.
Runner *RunnerConfig
Expand Down Expand Up @@ -92,6 +100,10 @@ func LoadConfig(runnerType string, lookuper envconfig.Lookuper) (*Config, error)
cfgErrs = append(cfgErrs, errs.ErrNegativeAutoShutdownTimeout)
}

if port, err := strconv.Atoi(cfg.HealthCheckServerPort); err != nil || port <= 0 || port >= 65536 {
cfgErrs = append(cfgErrs, fmt.Errorf("%s must be a valid port number", EnvVarHealthCheckPort))
}

// runner

runnerCfg, err := readFileConfig(runnerType)
Expand Down
19 changes: 2 additions & 17 deletions internal/http/healthcheck_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,22 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"task-runner-launcher/internal/logs"
"time"
)

const (
defaultPort = 5680
portEnvVar = "N8N_LAUNCHER_HEALTCHECK_PORT"
healthCheckPath = "/healthz"
readTimeout = 1 * time.Second
writeTimeout = 1 * time.Second
)

func NewHealthCheckServer() *http.Server {
func NewHealthCheckServer(port string) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc(healthCheckPath, handleHealthCheck)

return &http.Server{
Addr: fmt.Sprintf(":%d", GetPort()),
Addr: fmt.Sprintf(":%s", port),
Handler: mux,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Expand All @@ -48,14 +44,3 @@ func handleHealthCheck(w http.ResponseWriter, r *http.Request) {
return
}
}

func GetPort() int {
if customPortStr := os.Getenv(portEnvVar); customPortStr != "" {
if customPort, err := strconv.Atoi(customPortStr); err == nil && customPort > 0 && customPort < 65536 {
return customPort
}
logs.Warnf("%s sets an invalid port, falling back to default port %d", portEnvVar, defaultPort)
}

return defaultPort
}
54 changes: 1 addition & 53 deletions internal/http/healthcheck_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"
)

Expand Down Expand Up @@ -94,7 +93,7 @@ func (w *failingWriter) WriteHeader(statusCode int) {
}

func TestNewHealthCheckServer(t *testing.T) {
server := NewHealthCheckServer()
server := NewHealthCheckServer("5680")

if server == nil {
t.Fatal("NewHealthCheckServer() returned nil")
Expand All @@ -113,54 +112,3 @@ func TestNewHealthCheckServer(t *testing.T) {
t.Errorf("NewHealthCheckServer() writeTimeout = %v, want %v", server.WriteTimeout, writeTimeout)
}
}

func TestGetPort(t *testing.T) {
tests := []struct {
name string
envPort string
expectedPort int
}{
{
name: "returns default port when env var is not set",
envPort: "",
expectedPort: defaultPort,
},
{
name: "returns custom port when valid env var is set",
envPort: "8080",
expectedPort: 8080,
},
{
name: "returns default port when env var is negative",
envPort: "-1",
expectedPort: defaultPort,
},
{
name: "returns default port when env var is zero",
envPort: "0",
expectedPort: defaultPort,
},
{
name: "returns default port when env var is too large",
envPort: "65536",
expectedPort: defaultPort,
},
{
name: "returns default port when env var is not a number",
envPort: "invalid",
expectedPort: defaultPort,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
os.Setenv(portEnvVar, tt.envPort)

if got := GetPort(); got != tt.expectedPort {
t.Errorf("GetPort() = %v, want %v", got, tt.expectedPort)
}

os.Unsetenv(portEnvVar)
})
}
}

0 comments on commit 4cc37c8

Please sign in to comment.