Skip to content

Commit

Permalink
Ensure endpoints are healthy
Browse files Browse the repository at this point in the history
fleet server endpoint is built based on the elasticsearch endpoint

fix creation
  • Loading branch information
mrodm committed Jul 28, 2023
1 parent 1dcbe0a commit d0f9a9b
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 38 deletions.
33 changes: 28 additions & 5 deletions internal/stack/serverless.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package stack

import (
"context"
"errors"
"fmt"
"time"

"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/profile"
Expand Down Expand Up @@ -43,6 +49,12 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
return Config{}, fmt.Errorf("failed to create %s project %s in %s: %w", settings.Type, settings.Name, settings.Region, err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*30)
defer cancel()
if err := sp.client.EnsureEndpoints(ctx, project); err != nil {
return Config{}, fmt.Errorf("failed to ensure endpoints have been provisioned properly: %w", err)
}

var config Config
config.Provider = ProviderServerless
config.Parameters = map[string]string{
Expand All @@ -53,13 +65,22 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
config.ElasticsearchHost = project.Endpoints.Elasticsearch
config.KibanaHost = project.Endpoints.Kibana

config.ElasticsearchUsername = project.Credentials.Username
config.ElasticsearchPassword = project.Credentials.Password

printUserConfig(options.Printer, config)

err = storeConfig(sp.profile, config)
if err != nil {
return Config{}, fmt.Errorf("failed to store config: %w", err)
}

logger.Debug("Waiting for creation plan to be completed")
err = project.EnsureHealthy(ctx)
if err != nil {
return Config{}, fmt.Errorf("not all services are healthy: %w", err)
}

return config, nil
}

Expand Down Expand Up @@ -150,6 +171,8 @@ func (sp *serverlessProvider) BootUp(options Options) error {
// if err != nil {
// return fmt.Errorf("failed to replace GeoIP databases: %w", err)
// }
logger.Debugf("Project created: %s", project.Name)
printUserConfig(options.Printer, config)
case nil:
logger.Debugf("Project existed: %s", project.Name)
printUserConfig(options.Printer, config)
Expand Down Expand Up @@ -209,7 +232,7 @@ func (sp *serverlessProvider) TearDown(options Options) error {
return fmt.Errorf("failed to find current project: %w", err)
}

logger.Debugf("Deleting project %q", project.ID)
logger.Debugf("Deleting project %q (%s)", project.Name, project.ID)

err = sp.deleteProject(project, options)
if err != nil {
Expand All @@ -222,10 +245,10 @@ func (sp *serverlessProvider) TearDown(options Options) error {
// return fmt.Errorf("failed to delete GeoIP extension: %w", err)
// }

err = storeConfig(sp.profile, Config{})
if err != nil {
return fmt.Errorf("failed to store config: %w", err)
}
// err = storeConfig(sp.profile, Config{})
// if err != nil {
// return fmt.Errorf("failed to store config: %w", err)
// }

return nil
}
Expand Down
64 changes: 61 additions & 3 deletions internal/stack/serverless/client.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package serverless

import (
Expand All @@ -7,9 +11,12 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/elastic/elastic-package/internal/environment"
"github.com/elastic/elastic-package/internal/logger"
Expand All @@ -18,6 +25,9 @@ import (
type Client struct {
host string
apiKey string

username string
password string
}

// ClientOption is functional option modifying Serverless API client.
Expand Down Expand Up @@ -52,20 +62,34 @@ func NewClient(opts ...ClientOption) (*Client, error) {
return c, nil
}

// Address option sets the host to use to connect to Kibana.
// WithAddress option sets the host to use to connect to Kibana.
func WithAddress(address string) ClientOption {
return func(c *Client) {
c.host = address
}
}

// Address option sets the host to use to connect to Kibana.
// WithApiKey option sets the host to use to connect to Kibana.
func WithApiKey(apiKey string) ClientOption {
return func(c *Client) {
c.apiKey = apiKey
}
}

// WithUsername option sets the username.
func WithUsername(username string) ClientOption {
return func(c *Client) {
c.username = username
}
}

// WithPassword option sets the password.
func WithPassword(password string) ClientOption {
return func(c *Client) {
c.password = password
}
}

func (c *Client) get(ctx context.Context, resourcePath string) (int, []byte, error) {
return c.sendRequest(ctx, http.MethodGet, resourcePath, nil)
}
Expand Down Expand Up @@ -109,8 +133,13 @@ func (c *Client) newRequest(ctx context.Context, method, resourcePath string, re
}

req.Header.Add("content-type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))

if c.username != "" {
req.SetBasicAuth(c.username, c.password)
return req, nil
}

req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
return req, nil
}

Expand Down Expand Up @@ -199,3 +228,32 @@ func (c *Client) GetProject(projectType, projectID string) (*Project, error) {
err = json.Unmarshal(respBody, &project)
return project, err
}

func (c *Client) EnsureEndpoints(ctx context.Context, project *Project) error {
timer := time.NewTimer(time.Millisecond)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}

if project.Endpoints.Elasticsearch != "" {
if project.Endpoints.Fleet == "" {
logger.Debugf("Fleet Endpoint empty, setting it based on ES")
project.Endpoints.Fleet = strings.Replace(project.Endpoints.Elasticsearch, ".es.", ".fleet.", 1)
}
return nil
}

newProject, err := c.GetProject(project.Type, project.ID)
if err != nil {
log.Printf("request error: %w", err.Error())
timer.Reset(time.Second * 5)
continue
}

project.Endpoints = newProject.Endpoints
timer.Reset(time.Second * 5)
}
}
149 changes: 119 additions & 30 deletions internal/stack/serverless/project.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package serverless

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"

"github.com/elastic/elastic-package/internal/logger"
)

// Project represents a serverless project
Expand All @@ -16,6 +22,7 @@ type Project struct {

Name string `json:"name"`
ID string `json:"id"`
Alias string `json:"alias"`
Type string `json:"type"`
Region string `json:"region_id"`

Expand All @@ -32,44 +39,126 @@ type Project struct {
} `json:"endpoints"`
}

// NewObservabilityProject creates a new observability type project
func NewObservabilityProject(ctx context.Context, url, name, apiKey, region string) (*Project, error) {
return newProject(ctx, url, name, apiKey, region, "observability")
type serviceHealthy func(context.Context, *Project) error

func (p *Project) EnsureHealthy(ctx context.Context) error {
if err := p.ensureServiceHealthy(ctx, getESHealthy); err != nil {
return fmt.Errorf("elasticsearch not healthy: %w", err)
}
if err := p.ensureServiceHealthy(ctx, getKibanaHealthy); err != nil {
return fmt.Errorf("kibana not healthy: %w", err)
}
if err := p.ensureServiceHealthy(ctx, getFleetHealthy); err != nil {
return fmt.Errorf("fleet not healthy: %w", err)
}
return nil
}

// newProject creates a new serverless project
// Note that the Project.Endpoints may not be populated and another call may be required.
func newProject(ctx context.Context, url, name, apiKey, region, projectType string) (*Project, error) {
ReqBody := struct {
Name string `json:"name"`
RegionID string `json:"region_id"`
}{
Name: name,
RegionID: region,
}
p, err := json.Marshal(ReqBody)
func (p *Project) ensureServiceHealthy(ctx context.Context, serviceFunc serviceHealthy) error {
timer := time.NewTimer(time.Millisecond)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}

err := serviceFunc(ctx, p)
if err != nil {
logger.Debugf("service not ready: %s", err.Error())
timer.Reset(time.Second * 5)
continue
}

return nil
}
return nil
}

func getESHealthy(ctx context.Context, project *Project) error {
client, err := NewClient(
WithAddress(project.Endpoints.Elasticsearch),
WithUsername(project.Credentials.Username),
WithPassword(project.Credentials.Password),
)
if err != nil {
return nil, err
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", url+"/api/v1/serverless/projects/"+projectType, bytes.NewReader(p))

statusCode, respBody, err := client.get(ctx, "/_cluster/health")
if err != nil {
return nil, err
return fmt.Errorf("failed to query elasticsearch health: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody))
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "ApiKey "+apiKey)

resp, err := http.DefaultClient.Do(req)
var health struct {
Status string `json:"status"`
}
if err := json.Unmarshal(respBody, &health); err != nil {
log.Printf("Unable to decode response: %v body: %s", err, string(respBody))
return err
}
if health.Status == "green" {
return nil
}
return fmt.Errorf("elasticsearch unhealthy: %s", health.Status)
}

func getKibanaHealthy(ctx context.Context, project *Project) error {
client, err := NewClient(
WithAddress(project.Endpoints.Kibana),
WithUsername(project.Credentials.Username),
WithPassword(project.Credentials.Password),
)
if err != nil {
return err
}

statusCode, respBody, err := client.get(ctx, "/api/status")
if err != nil {
return fmt.Errorf("failed to query kibana status: %w", err)
}
if statusCode != http.StatusOK {
return fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody))
}

var status struct {
Status struct {
Overall struct {
Level string `json:"level"`
} `json:"overall"`
} `json:"status"`
}
if err := json.Unmarshal(respBody, &status); err != nil {
log.Printf("Unable to decode response: %v body: %s", err, string(respBody))
return err
}
if status.Status.Overall.Level == "available" {
return nil
}
return fmt.Errorf("kibana unhealthy: %s", status.Status.Overall.Level)
}

func getFleetHealthy(ctx context.Context, project *Project) error {
client, err := NewClient(
WithAddress(project.Endpoints.Fleet),
WithUsername(project.Credentials.Username),
WithPassword(project.Credentials.Password),
)
if err != nil {
return nil, err
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
p, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("unexpected status code %d, body: %s", resp.StatusCode, string(p))
statusCode, respBody, err := client.get(ctx, "/api/status")
if err != nil {
return fmt.Errorf("failed to query fleet status: %w", err)
}
if statusCode != http.StatusOK {
return fmt.Errorf("fleet unhealthy: status code %d, body: %s", statusCode, string(respBody))
}
project := &Project{url: url, apiKey: apiKey}

err = json.NewDecoder(resp.Body).Decode(project)
return project, err
return nil
}

0 comments on commit d0f9a9b

Please sign in to comment.