Skip to content

Commit

Permalink
POC: watch config files to load/unload maps
Browse files Browse the repository at this point in the history
  • Loading branch information
jchamberlain committed Sep 1, 2023
1 parent f5aa603 commit 09528bb
Show file tree
Hide file tree
Showing 27 changed files with 3,577 additions and 29 deletions.
47 changes: 47 additions & 0 deletions atlas/atlas.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package atlas

import (
"context"
"fmt"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -221,6 +222,52 @@ func (a *Atlas) AddMap(m Map) {
a.maps[m.Name] = m
}

// AddMaps registers maps by name, all or nothing. If a map already exists an error will be returned.
func (a *Atlas) AddMaps(maps []Map) error {
if a == nil {
// Use the default Atlas if a, is nil. This way the empty value is
// still useful.
return defaultAtlas.AddMaps(maps)
}
a.Lock()
defer a.Unlock()

if a.maps == nil {
a.maps = map[string]Map{}
}

// Check all the names for conflicts before we add any map, so that we can add all or none.
for _, m := range maps {
if _, exists := a.maps[m.Name]; exists {
return fmt.Errorf("Map with name \"%s\" already exists.", m.Name)
}
}

// Now add all the maps.
for _, m := range maps {
a.maps[m.Name] = m
}

return nil
}

func (a *Atlas) RemoveMaps(names []string) {
if a == nil {
// Use the default Atlas if a, is nil. This way the empty value is
// still useful.
defaultAtlas.RemoveMaps(names)
return
}
a.Lock()
defer a.Unlock()

for _, name := range names {
if _, exists := a.maps[name]; exists {
delete(a.maps, name)
}
}
}

// GetCache returns the registered cache if one is registered, otherwise nil
func (a *Atlas) GetCache() cache.Interface {
if a == nil {
Expand Down
11 changes: 9 additions & 2 deletions cmd/internal/register/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func Maps(a *atlas.Atlas, maps []provider.Map, providers map[string]provider.Til
)

// iterate our maps
newMaps := make([]atlas.Map, 0, len(maps))
for _, m := range maps {
newMap := webMercatorMapFromConfigMap(m)

Expand All @@ -157,9 +158,15 @@ func Maps(a *atlas.Atlas, maps []provider.Map, providers map[string]provider.Til
newMap.Layers = append(newMap.Layers, layer)
}

a.AddMap(newMap)
newMaps = append(newMaps, newMap)
}
return nil

// Register all or nothing.
return a.AddMaps(newMaps)
}

func UnloadMaps(a *atlas.Atlas, names []string) {
a.RemoveMaps(names)
}

// Find allow HTML tag
Expand Down
147 changes: 136 additions & 11 deletions cmd/tegola/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package cmd

import (
"context"
"fmt"

"github.com/go-spatial/cobra"
"github.com/go-spatial/tegola/atlas"
"github.com/go-spatial/tegola/cmd/internal/register"
cachecmd "github.com/go-spatial/tegola/cmd/tegola/cmd/cache"
"github.com/go-spatial/tegola/config"
"github.com/go-spatial/tegola/config/source"
"github.com/go-spatial/tegola/dict"
"github.com/go-spatial/tegola/internal/build"
"github.com/go-spatial/tegola/internal/env"
"github.com/go-spatial/tegola/internal/log"
"github.com/go-spatial/tegola/provider"
)

var (
Expand Down Expand Up @@ -114,22 +118,22 @@ func initConfig(configFile string, cacheRequired bool, logLevel string, logger s
return err
}

// init our providers
// but first convert []env.Map -> []dict.Dicter
provArr := make([]dict.Dicter, len(conf.Providers))
for i := range provArr {
provArr[i] = conf.Providers[i]
// Init providers from the primary config file.
providers, err := initProviders(conf.Providers, conf.Maps)
if err != nil {
return err
}

providers, err := register.Providers(provArr, conf.Maps)
if err != nil {
return fmt.Errorf("could not register providers: %v", err)
// Init maps from the primary config file.
if err = initMaps(conf.Maps, providers); err != nil {
return err
}

// init our maps
if err = register.Maps(nil, conf.Maps, providers); err != nil {
return fmt.Errorf("could not register maps: %v", err)
// Setup the app config source.
if err = initAppConfigSource(conf); err != nil {
return err
}

if len(conf.Cache) == 0 && cacheRequired {
return fmt.Errorf("no cache defined in config, please check your config (%v)", configFile)
}
Expand All @@ -152,3 +156,124 @@ func initConfig(configFile string, cacheRequired bool, logLevel string, logger s
atlas.SetObservability(observer)
return nil
}

// initProviders translate provider config from a TOML file into usable Provider objects.
func initProviders(providersConfig []env.Dict, maps []provider.Map) (map[string]provider.TilerUnion, error) {
// first convert []env.Map -> []dict.Dicter
provArr := make([]dict.Dicter, len(providersConfig))
for i := range provArr {
provArr[i] = providersConfig[i]
}

providers, err := register.Providers(provArr, conf.Maps)
if err != nil {
return nil, fmt.Errorf("could not register providers: %v", err)
}

return providers, nil
}

// initMaps registers maps with Atlas to be ready for service.
func initMaps(maps []provider.Map, providers map[string]provider.TilerUnion) error {
if err := register.Maps(nil, maps, providers); err != nil {
return fmt.Errorf("could not register maps: %v", err)
}

return nil
}

// initAppConfigSource sets up an additional configuration source for "apps" (groups of providers and maps) to be loaded and unloaded on-the-fly.
func initAppConfigSource(conf config.Config) error {
// Get the config source type. If none, return.
val, err := conf.AppConfigSource.String("type", nil)
if err != nil || val == "" {
return nil
}

// Initialize the source.
ctx := context.Background() // Not doing anything with context now, but could use it for stopping this goroutine.
src, err := source.InitSource(val, conf.AppConfigSource, conf.BaseDir)
if err != nil {
return err
}

// Load and start watching for new apps.
watcher, err := src.LoadAndWatch(ctx)
if err != nil {
return err
}

go func() {
// Keep a record of what we've loaded so that we can unload when needed.
apps := make(map[string]source.App)

for {
select {
case app, ok := <-watcher.Updates:
if !ok {
return
}

// Check for validity first.
if err := config.ValidateApp(&app); err != nil {
log.Errorf("Failed validating app %s. %s", app.Key, err)
continue
}

// If the new app is named the same as an existing app, first unload the existing one.
if old, exists := apps[app.Key]; exists {
log.Infof("Unloading app %s...", old.Key)
// We need only unload maps, since the providers don't live outside of maps.
register.UnloadMaps(nil, getMapNames(old))
delete(apps, app.Key)
}

log.Infof("Loading app %s...", app.Key)

// Init new providers
providers, err := initProviders(app.Providers, app.Maps)
if err != nil {
log.Errorf("Failed initializing providers from %s: %s", app.Key, err)
continue
}

// Init new maps
if err = initMaps(app.Maps, providers); err != nil {
log.Errorf("Failed initializing maps from %s: %s", app.Key, err)
continue
}

// Record that we've loaded this app.
apps[app.Key] = app

case deleted, ok := <-watcher.Deletions:
if !ok {
return
}

// Unload an app's maps if it was previously loaded.
if app, exists := apps[deleted]; exists {
log.Infof("Unloading app %s...", app.Key)
register.UnloadMaps(nil, getMapNames(app))
delete(apps, app.Key)
} else {
log.Infof("Received an unload event for app %s, but couldn't find it.", deleted)
}

case <-ctx.Done():
return
}
}
}()

return nil
}

func getMapNames(app source.App) []string {
names := make([]string, 0, len(app.Maps))
for _, m := range app.Maps {
names = append(names, string(m.Name))
}

return names
}
Loading

0 comments on commit 09528bb

Please sign in to comment.