Skip to content

Commit

Permalink
Merge branch 'main' into add-coverage-for-runner-health-monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Nov 28, 2024
2 parents 8d3905f + 97150cf commit 1522c91
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 57 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
on: [push]

jobs:
audit:
checks:
runs-on: ubuntu-latest
steps:
- uses: actions/[email protected]
Expand All @@ -16,7 +16,7 @@ jobs:
version: latest

- name: Format check
run: go fmt ./...
run: make fmt-check

- name: Static analysis
run: go vet ./...
Expand Down
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: audit build lint lintfix run
.PHONY: audit build lint lintfix fmt fmt-check run test test-verbose test-coverage

check: lint
go fmt ./...
Expand All @@ -13,6 +13,15 @@ lint:
lintfix:
golangci-lint run --fix

fmt:
go fmt ./...

fmt-check:
@if [ -n "$$(go fmt ./...)" ]; then \
echo "Found unformatted Go files. Please run 'make fmt'"; \
exit 1; \
fi

run: build
./bin/main javascript

Expand Down
7 changes: 3 additions & 4 deletions cmd/launcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@ func main() {
errorreporting.Init(cfg.Sentry)
defer errorreporting.Close()

srv := http.NewHealthCheckServer()
srv := http.NewHealthCheckServer(cfg.HealthCheckServerPort)
go func() {
logs.Infof("Starting health check server at port %d", http.GetPort())

if err := srv.ListenAndServe(); err != nil {
errMsg := "Health check server failed to start"
if opErr, ok := err.(*net.OpError); ok && opErr.Op == "listen" {
errMsg = fmt.Sprintf("%s: Port %d is already in use", errMsg, http.GetPort())
errMsg = fmt.Sprintf("%s: Port %s is already in use", errMsg, srv.Addr)
} else {
errMsg = fmt.Sprintf("%s: %s", errMsg, err)
}
logs.Error(errMsg)
}
logs.Infof("Started launcher's health check server at port %d", srv.Addr)
}()

cmd := &commands.LaunchCommand{}
Expand Down
12 changes: 12 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var configPath = "/etc/n8n-task-runners.json"

var cfg Config

const (
// EnvVarHealthCheckPort is the env var for the port for the launcher's health check server.
EnvVarHealthCheckPort = "N8N_LAUNCHER_HEALTCHECK_PORT"
)

// Config holds the full configuration for the launcher.
type Config struct {
// LogLevel is the log level for the launcher. Default: `info`.
Expand All @@ -33,6 +38,9 @@ type Config struct {
// TaskBrokerURI is the URI of the task broker server.
TaskBrokerURI string `env:"N8N_TASK_BROKER_URI, default=http://127.0.0.1:5679"`

// HealthCheckServerPort is the port for the launcher's health check server.
HealthCheckServerPort string `env:"N8N_LAUNCHER_HEALTCHECK_PORT, default=5680"`

// Runner is the runner config for the task runner, obtained from:
// `/etc/n8n-task-runners.json`.
Runner *RunnerConfig
Expand Down Expand Up @@ -92,6 +100,10 @@ func LoadConfig(runnerType string, lookuper envconfig.Lookuper) (*Config, error)
cfgErrs = append(cfgErrs, errs.ErrNegativeAutoShutdownTimeout)
}

if port, err := strconv.Atoi(cfg.HealthCheckServerPort); err != nil || port <= 0 || port >= 65536 {
cfgErrs = append(cfgErrs, fmt.Errorf("%s must be a valid port number", EnvVarHealthCheckPort))
}

// runner

runnerCfg, err := readFileConfig(runnerType)
Expand Down
20 changes: 10 additions & 10 deletions internal/http/check_until_broker_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ func TestCheckUntilBrokerReadyHappyPath(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requestCount := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
tt.serverFn(w, r, requestCount)
}))
defer server.Close()
defer srv.Close()

ctx, cancel := context.WithTimeout(context.Background(), tt.timeout)
defer cancel()

done := make(chan error)
go func() {
done <- CheckUntilBrokerReady(server.URL)
done <- CheckUntilBrokerReady(srv.URL)
}()

select {
Expand Down Expand Up @@ -83,11 +83,11 @@ func TestCheckUntilBrokerReadyErrors(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(tt.handler))
srv := httptest.NewServer(http.HandlerFunc(tt.handler))
if tt.name == "error - closed server" {
server.Close()
srv.Close()
} else {
defer server.Close()
defer srv.Close()
}

// CheckUntilBrokerReady retries forever, so set up
Expand All @@ -99,7 +99,7 @@ func TestCheckUntilBrokerReadyErrors(t *testing.T) {

brokerUnexpectedlyReady := make(chan error)
go func() {
brokerUnexpectedlyReady <- CheckUntilBrokerReady(server.URL)
brokerUnexpectedlyReady <- CheckUntilBrokerReady(srv.URL)
}()

select {
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestSendReadinessRequest(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET request, got %s", r.Method)
}
Expand All @@ -146,9 +146,9 @@ func TestSendReadinessRequest(t *testing.T) {
}
w.WriteHeader(tt.serverResponse)
}))
defer server.Close()
defer srv.Close()

resp, err := sendHealthRequest(server.URL)
resp, err := sendHealthRequest(srv.URL)

if err == nil {
defer resp.Body.Close()
Expand Down
10 changes: 5 additions & 5 deletions internal/http/fetch_grant_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ func sendGrantTokenRequest(taskBrokerServerURI, authToken string) (string, error
payload := map[string]string{"token": authToken}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return "", err
return "", fmt.Errorf("failed to marshal grant token request: %w", err)
}

req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes))
if err != nil {
return "", err
return "", fmt.Errorf("failed to create grant token request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

Expand All @@ -48,9 +48,9 @@ func sendGrantTokenRequest(taskBrokerServerURI, authToken string) (string, error
return tokenResp.Data.Token, nil
}

// FetchGrantToken exchanges the launcher's auth token for a single-use
// grant token from the task broker. In case the task broker is
// temporarily unavailable, this exchange is retried a limited number of times.
// FetchGrantToken exchanges the launcher's auth token for a single-use grant
// token from the task broker. In case the task broker is temporarily
// unavailable, this exchange is retried a limited number of times.
func FetchGrantToken(taskBrokerServerURI, authToken string) (string, error) {
grantTokenFetch := func() (string, error) {
token, err := sendGrantTokenRequest(taskBrokerServerURI, authToken)
Expand Down
174 changes: 174 additions & 0 deletions internal/http/fetch_grant_token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package http

import (
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"task-runner-launcher/internal/retry"
"testing"
"time"
)

func init() {
retry.DefaultMaxRetryTime = 50 * time.Millisecond
retry.DefaultMaxRetries = 3
retry.DefaultWaitTimeBetweenRetries = 10 * time.Millisecond
}

func TestFetchGrantToken(t *testing.T) {
tests := []struct {
name string
serverURL string
authToken string
serverFn func(w http.ResponseWriter, r *http.Request)
wantErr bool
errorContains string
}{
{
name: "successful request",
authToken: "test-token",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]string{
"token": "test-grant-token",
},
}); err != nil {
t.Errorf("Failed to encode response: %v", err)
}
},
},
{
name: "invalid response json",
authToken: "test-token",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("invalid json")); err != nil {
t.Errorf("Failed to write response: %v", err)
}
},
wantErr: true,
errorContains: "failed to decode grant token response",
},
{
name: "server error",
authToken: "test-token",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
},
wantErr: true,
errorContains: "status code 500",
},
{
name: "verify request body",
authToken: "test-auth-token",
serverFn: func(w http.ResponseWriter, r *http.Request) {
var body struct {
Token string `json:"token"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("Failed to decode request body: %v", err)
}
if body.Token != "test-auth-token" {
t.Errorf("Expected auth token 'test-auth-token', got %q", body.Token)
}
if r.Header.Get("Content-Type") != "application/json" {
t.Errorf("Expected Content-Type 'application/json', got %q", r.Header.Get("Content-Type"))
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]string{
"token": "test-grant-token",
},
}); err != nil {
t.Errorf("Failed to encode response: %v", err)
}
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(tt.serverFn))
defer srv.Close()

token, err := FetchGrantToken(srv.URL, tt.authToken)
hasErr := err != nil

if hasErr != tt.wantErr {
t.Errorf("FetchGrantToken() error = %v, wantErr %v", err, tt.wantErr)
return
}

if hasErr && tt.wantErr && !strings.Contains(err.Error(), tt.errorContains) {
t.Errorf("Expected error containing %q, got %v", tt.errorContains, err)
}

if !tt.wantErr && token == "" {
t.Error("Expected non-empty token for successful request")
}
})
}
}

func TestFetchGrantTokenInvalidURL(t *testing.T) {
_, err := FetchGrantToken("not-a-valid-url", "test-token")
if err == nil {
t.Error("Expected error for invalid URL, got nil")
}
}

func TestFetchGrantTokenRetry(t *testing.T) {
tryCount := 0
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
tryCount++
if tryCount < 2 {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]string{
"token": "test-grant-token",
},
}); err != nil {
t.Errorf("Failed to encode response: %v", err)
}
}))
defer srv.Close()

token, err := FetchGrantToken(srv.URL, "test-token")
if err != nil {
t.Errorf("FetchGrantToken() unexpected error = %v", err)
}
if token == "" {
t.Error("Expected non-empty token after retry")
}
if tryCount != 2 {
t.Errorf("Expected 2 attempts, got %d", tryCount)
}
}

func TestFetchGrantTokenConnectionFailure(t *testing.T) {
invalidServerURL := "http://localhost:1"

token, err := FetchGrantToken(invalidServerURL, "test-token")

if err == nil {
t.Error("Expected error for connection failure, got nil")
}

if !strings.Contains(err.Error(), "connection refused") {
t.Errorf("Expected error containing 'connection refused', got %v", err)
}

if token != "" {
t.Errorf("Expected empty token for failed connection, got %q", token)
}
}
Loading

0 comments on commit 1522c91

Please sign in to comment.