diff --git a/internal/goverseer/watcher/gce_metadata_watcher/gce_metadata_watcher.go b/internal/goverseer/watcher/gce_metadata_watcher/gce_metadata_watcher.go new file mode 100644 index 0000000..d4f6aa2 --- /dev/null +++ b/internal/goverseer/watcher/gce_metadata_watcher/gce_metadata_watcher.go @@ -0,0 +1,282 @@ +package gce_metadata_watcher + +import ( + "context" + "fmt" + "io" + "log/slog" + "net/http" + "time" + + "github.com/lmittmann/tint" + "github.com/simplifi/goverseer/internal/goverseer/config" +) + +const ( + // ValidSourceInstance is the string value for an instance metadata source + ValidSourceInstance = "instance" + + // ValidSourceProject is the string value for a project metadata source + ValidSourceProject = "project" + + // DefaultSource is the default metadata source + DefaultSource = ValidSourceInstance + + // DefaultRecursive is the default value for the recursive flag + // it can be overridden by setting the recursive flag in the config + DefaultRecursive = false + + // DefaultMetadataUrl is the default URL for GCE metadata + DefaultMetadataUrl = "http://metadata.google.internal/computeMetadata/v1" + + // DefaultMetadataErrorWaitSeconds is the default number of seconds to wait + // before retrying a failed metadata request + DefaultMetadataErrorWaitSeconds = 1 +) + +// GceMetadataWatcherConfig is the configuration for a GCE metadata watcher +type GceMetadataWatcherConfig struct { + // Source is the metadata source to watch + // Valid values are 'instance' and 'project' + // Default is 'instance' + Source string + + // Key is the key to watch in the GCE metadata + // This is required config value + Key string + + // Recursive is whether to recurse the metadata keys + // Default is false + Recursive bool + + // MetadataUrl is the URL this watcher will use when reading from the GCE + // metadata server + // It can be useful to override during testing + // e.g. http://localhost:8888/computeMetadata/v1 + MetadataUrl string + + // MetadataErrorWaitSeconds is the number of seconds to wait before retrying + // a failed metadata request. This prevents hammering the metadata server. + // Default is 1 second + MetadataErrorWaitSeconds int +} + +// ParseConfig parses the config for the watcher +// It validates the config, sets defaults if missing, and returns the config +func ParseConfig(config interface{}) (*GceMetadataWatcherConfig, error) { + cfgMap, ok := config.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid config") + } + + cfg := &GceMetadataWatcherConfig{ + Source: DefaultSource, + Recursive: DefaultRecursive, + MetadataUrl: DefaultMetadataUrl, + MetadataErrorWaitSeconds: DefaultMetadataErrorWaitSeconds, + } + + // If source is set, it should be one of the valid sources + if cfgMap["source"] != nil { + if source, ok := cfgMap["source"].(string); ok { + if source != ValidSourceInstance && source != ValidSourceProject { + return nil, fmt.Errorf("source must be one of %s or %s", ValidSourceInstance, ValidSourceProject) + } + cfg.Source = source + } else if cfgMap["source"] != nil { + return nil, fmt.Errorf("source must be a string") + } + } + + // If recursive is set, it should be a boolean + if cfgMap["recursive"] != nil { + if recursive, ok := cfgMap["recursive"].(bool); ok { + cfg.Recursive = recursive + } else if cfgMap["recursive"] != nil { + return nil, fmt.Errorf("recursive must be a boolean") + } + } + + // Key is required and must be a string + if key, ok := cfgMap["key"].(string); ok { + if key == "" { + return nil, fmt.Errorf("key must not be empty") + } + cfg.Key = key + } else if cfgMap["key"] != nil { + return nil, fmt.Errorf("key must be a string") + } else { + return nil, fmt.Errorf("key is required") + } + + // If metadata_url is set, it should be a string + if cfgMap["metadata_url"] != nil { + if metadataUrl, ok := cfgMap["metadata_url"].(string); ok { + if metadataUrl == "" { + return nil, fmt.Errorf("metadata_url must not be empty") + } + cfg.MetadataUrl = metadataUrl + } else if cfgMap["metadata_url"] != nil { + return nil, fmt.Errorf("metadata_url must be a string") + } + } + + // If metadata_error_wait_seconds is set, it should be an integer + if cfgMap["metadata_error_wait_seconds"] != nil { + if metadataErrorWaitSeconds, ok := cfgMap["metadata_error_wait_seconds"].(int); ok { + cfg.MetadataErrorWaitSeconds = metadataErrorWaitSeconds + } else if cfgMap["metadata_error_wait_seconds"] != nil { + return nil, fmt.Errorf("metadata_error_wait_seconds must be an integer") + } + } + + return cfg, nil +} + +type GceMetadataWatcher struct { + // Key is the key to watch in the GCE metadata + Key string + + // Recursive is whether to recurse the metadata keys + Recursive bool + + // MetadataUrl is the URL this watcher will use when reading from the GCE + // metadata server + MetadataUrl string + + // MetadataErrorWait is the time to wait before trying failed metadata request + MetadataErrorWait time.Duration + + // lastETag is the last etag, used to compare changes + lastETag string + + // log is the logger + log *slog.Logger + + // ctx is the context + ctx context.Context + + // cancel is the cancel function used to stop the watcher + cancel context.CancelFunc +} + +// New creates a new GceMetadataWatcher based on the passed config +func New(cfg config.Config, log *slog.Logger) (*GceMetadataWatcher, error) { + pcfg, err := ParseConfig(cfg.Watcher.Config) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + + return &GceMetadataWatcher{ + Key: pcfg.Key, + Recursive: pcfg.Recursive, + MetadataUrl: pcfg.MetadataUrl, + MetadataErrorWait: time.Duration(pcfg.MetadataErrorWaitSeconds) * time.Second, + log: log, + ctx: ctx, + cancel: cancel, + }, nil +} + +// gceMetadataResponse is the response from the GCE metadata server +type gceMetadataResponse struct { + // etag is the etag of the metadata + // used to compare changes + etag string + + // body is the body of the metadata + body string +} + +// getMetadata gets the metadata from the GCE metadata server +// It returns the metadata response or an error +func (w *GceMetadataWatcher) getMetadata() (*gceMetadataResponse, error) { + client := http.Client{ + Timeout: 0, // No timeout (infinite) + } + + urlWithKey := fmt.Sprintf("%s/%s", w.MetadataUrl, w.Key) + req, err := http.NewRequestWithContext(w.ctx, "GET", urlWithKey, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Metadata-Flavor", "Google") + q := req.URL.Query() + q.Add("wait_for_change", "true") + q.Add("recursive", fmt.Sprintf("%v", w.Recursive)) + req.URL.RawQuery = q.Encode() + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("status: %s", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, err + } + + return &gceMetadataResponse{ + etag: resp.Header.Get("ETag"), + body: string(body), + }, nil +} + +// Watch watches the GCE metadata for changes and sends value to changes channel +// The changes channel is where the value is sent when it changes +func (w *GceMetadataWatcher) Watch(change chan interface{}) { + w.log.Info("starting watcher") + + for { + select { + case <-w.ctx.Done(): + return + default: + gceMetadata, err := w.getMetadata() + if err != nil { + // Avoid logging errors if the context was canceled mid-request + // This will happen when the watcher is stopped + if w.ctx.Err() == context.Canceled { + continue + } + + w.log.Error("error getting metadata", + tint.Err(err)) + + // Usually getMetadata opens up a connection to the metadata server + // and waits for a change. If there is an error we want to wait for a + // bit before trying again to prevent hammering the metadata server. + // Since we're in a for loop here the retrys will come VERY fast without + // this sleep. + time.Sleep(w.MetadataErrorWait) + continue + } + + // Only send a change if it has actually changed by comparing etags + if w.lastETag != gceMetadata.etag { + w.log.Info("change detected", + "key", w.Key, + "etag", gceMetadata.etag, + "previous_etag", w.lastETag) + + change <- gceMetadata.body + + w.lastETag = gceMetadata.etag + } + } + } +} + +// Stop signals the watcher to stop +func (w *GceMetadataWatcher) Stop() { + w.log.Info("shutting down watcher") + w.cancel() +} diff --git a/internal/goverseer/watcher/gce_metadata_watcher/gce_metadata_watcher_test.go b/internal/goverseer/watcher/gce_metadata_watcher/gce_metadata_watcher_test.go new file mode 100644 index 0000000..427b34c --- /dev/null +++ b/internal/goverseer/watcher/gce_metadata_watcher/gce_metadata_watcher_test.go @@ -0,0 +1,288 @@ +package gce_metadata_watcher + +import ( + "context" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "sync" + "testing" + "time" + + "github.com/lmittmann/tint" + "github.com/simplifi/goverseer/internal/goverseer/config" + "github.com/stretchr/testify/assert" +) + +// TestParseConfig tests the ParseConfig function +func TestParseConfig(t *testing.T) { + var parsedConfig *GceMetadataWatcherConfig + + testKey := "valid" + parsedConfig, err := ParseConfig(map[string]interface{}{ + "key": testKey, + }) + assert.NoError(t, err, + "Parsing a valid config should not return an error") + assert.Equal(t, testKey, parsedConfig.Key, + "Key should be set to the value in the config") + assert.Equal(t, DefaultSource, parsedConfig.Source, + "Source should be set to the default") + assert.Equal(t, DefaultRecursive, parsedConfig.Recursive, + "Recursive should be set to the default") + assert.Equal(t, DefaultMetadataUrl, parsedConfig.MetadataUrl, + "MetadataUrl should be set to the default") + assert.Equal(t, DefaultMetadataErrorWaitSeconds, parsedConfig.MetadataErrorWaitSeconds, + "MetadataErrorWaitSeconds should be set to the default") + + // Test setting the source + parsedConfig, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "source": "instance", + }) + assert.NoError(t, err, + "Parsing a config with a valid source should not return an error") + assert.Equal(t, "instance", parsedConfig.Source, + "Source should be set to the value in the config") + + _, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "source": "this-is-wrong", + }) + assert.Error(t, err, + "Parsing a config with an incorrect source value should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "source": 1, + }) + assert.Error(t, err, + "Parsing a config with an incorrect source type should return an error") + + // Test setting recursive + parsedConfig, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "recursive": true, + }) + assert.NoError(t, err, + "Parsing a config with a valid recursive value should not return an error") + assert.Equal(t, true, parsedConfig.Recursive, + "Recursive should be set to the value in the config") + + _, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "recursive": 1, + }) + assert.Error(t, err, + "Parsing a config with an incorrect recursive type should return an error") + + // Test setting key + parsedConfig, err = ParseConfig(map[string]interface{}{ + "key": testKey, + }) + assert.NoError(t, err, + "Parsing a config with a valid key should not return an error") + assert.Equal(t, testKey, parsedConfig.Key, + "Key should be set to the value in the config") + + _, err = ParseConfig(map[string]interface{}{ + "key": nil, + }) + assert.Error(t, err, + "Parsing a config with no key should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "key": "", + }) + assert.Error(t, err, + "Parsing a config with an empty key should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "key": 1, + }) + assert.Error(t, err, + "Parsing a config with an incorrect key type should return an error") + + // Test setting the metadata_url + parsedConfig, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "metadata_url": "http://localhost:8888/computeMetadata/v1", + }) + assert.NoError(t, err, + "Parsing a config with a valid metadata_url should not return an error") + assert.Equal(t, "http://localhost:8888/computeMetadata/v1", parsedConfig.MetadataUrl, + "MetadataUrl should be set to the value in the config") + + _, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "metadata_url": "", + }) + assert.Error(t, err, + "Parsing a config with an empty metadata_url should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "metadata_url": 1, + }) + assert.Error(t, err, + "Parsing a config with an incorrect metadata_url type should return an error") + + // Test setting the metadata_error_wait_seconds + parsedConfig, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "metadata_error_wait_seconds": 10, + }) + assert.NoError(t, err, + "Parsing a config with a valid metadata_error_wait_seconds should not return an error") + assert.Equal(t, 10, parsedConfig.MetadataErrorWaitSeconds, + "MetadataUrl should be set to the value in the config") + + _, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "metadata_url": "", + }) + assert.Error(t, err, + "Parsing a config with an empty metadata_url should return an error") + + _, err = ParseConfig(map[string]interface{}{ + "key": testKey, + "metadata_url": 1, + }) + assert.Error(t, err, + "Parsing a config with an incorrect metadata_url type should return an error") +} + +// TestNew tests the New function +func TestNew(t *testing.T) { + var cfg config.Config + cfg = config.Config{ + Name: "TestConfig", + Watcher: config.WatcherConfig{ + Type: "gce_metadata", + Config: map[string]interface{}{ + "key": "test", + }, + }, + } + watcher, err := New(cfg, nil) + assert.NoError(t, err, + "Creating a new GceMetadataWatcher should not return an error") + assert.NotNil(t, watcher, + "Creating a new GceMetadataWatcher should return a watcher") + + cfg = config.Config{ + Name: "TestConfig", + Watcher: config.WatcherConfig{ + Type: "gce_metadata", + Config: map[string]interface{}{ + "key": nil, + }, + }, + } + watcher, err = New(cfg, nil) + assert.Error(t, err, + "Creating a new GceMetadataWatcher with an invalid config should return an error") + assert.Nil(t, watcher, + "Creating a new GceMetadataWatcher with an invalid config should not return a watcher") +} + +func TestGceMetadataWatcher_Watch(t *testing.T) { + log := slog.New(tint.NewHandler(os.Stderr, &tint.Options{Level: slog.LevelError})) + mockResponseChan := make(chan struct{}) + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Wait for a signal, this is used to simulate a change in the metadata + // value on GCE during tests + <-mockResponseChan + w.Header().Add("ETag", "mock-etag") + w.WriteHeader(http.StatusOK) + w.Write([]byte("mock response")) + })) + defer mockServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + + watcher := GceMetadataWatcher{ + Key: "test", + Recursive: true, + MetadataUrl: mockServer.URL, + MetadataErrorWait: 1 * time.Second, + log: log, + ctx: ctx, + cancel: cancel, + } + + changes := make(chan interface{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + watcher.Watch(changes) + }() + + // Simulate a change by closing the mock response channel, which will unblock + // the request + close(mockResponseChan) + + // Assert that the change was received + select { + case value := <-changes: + assert.Equal(t, "mock response", value) + case <-time.After(1 * time.Second): + assert.Fail(t, "Timed out waiting for change") + } + + // Stop the watcher and wait + watcher.Stop() + wg.Wait() +} + +func TestGceMetadataWatcher_Stop(t *testing.T) { + log := slog.New(tint.NewHandler(os.Stderr, &tint.Options{Level: slog.LevelError})) + mockResponseChan := make(chan struct{}) + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Wait for a signal, this is used to simulate a change in the metadata + // value on GCE during tests + <-mockResponseChan + w.Header().Add("ETag", "mock-etag") + w.WriteHeader(http.StatusOK) + w.Write([]byte("mock response")) + })) + defer mockServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + + watcher := GceMetadataWatcher{ + Key: "test", + Recursive: true, + MetadataUrl: mockServer.URL, + MetadataErrorWait: 1 * time.Second, + log: log, + ctx: ctx, + cancel: cancel, + } + + changes := make(chan interface{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + watcher.Watch(changes) + }() + + // Stop the watcher and wait + watcher.Stop() + wg.Wait() + + // Simulate a change by closing the mock response channel, which will unblock + // the request + close(mockResponseChan) + + // Assert that the change was NOT received + select { + case <-changes: + assert.Fail(t, "Received change after stopping watcher") + case <-time.After(1 * time.Second): + // Success + } +} diff --git a/internal/goverseer/watcher/watcher.go b/internal/goverseer/watcher/watcher.go index 05145af..f4d694c 100644 --- a/internal/goverseer/watcher/watcher.go +++ b/internal/goverseer/watcher/watcher.go @@ -7,6 +7,7 @@ import ( "github.com/lmittmann/tint" "github.com/simplifi/goverseer/internal/goverseer/config" + "github.com/simplifi/goverseer/internal/goverseer/watcher/gce_metadata_watcher" "github.com/simplifi/goverseer/internal/goverseer/watcher/time_watcher" ) @@ -28,6 +29,8 @@ func New(cfg *config.Config) (Watcher, error) { switch cfg.Watcher.Type { case "time": return time_watcher.New(*cfg, logger) + case "gce_metadata": + return gce_metadata_watcher.New(*cfg, logger) default: return nil, fmt.Errorf("unknown watcher type: %s", cfg.Watcher.Type) }