Skip to content

Commit

Permalink
Ensure that annotation enabled traffic-agents are uninstalled.
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Hallgren <[email protected]>
  • Loading branch information
thallgren committed Feb 3, 2025
1 parent 1f733e7 commit 8ecec22
Showing 1 changed file with 34 additions and 29 deletions.
63 changes: 34 additions & 29 deletions cmd/traffic/cmd/manager/mutator/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (c *configWatcher) isRolloutNeeded(ctx context.Context, wl k8sapi.Workload,
if ia, ok := podMeta.GetAnnotations()[agentconfig.InjectAnnotation]; ok {
// Annotation controls injection, so no explicit rollout is needed unless the deployment was added before the traffic-manager.
// If the annotation changes, there will be an implicit rollout anyway.
if wl.GetCreationTimestamp().After(c.startedAt) {
if wl.GetCreationTimestamp().After(c.startedAt) && c.running.Load() {
dlog.Debugf(ctx, "Rollout of %s.%s is not necessary. Pod template has inject annotation %s",
wl.GetName(), wl.GetNamespace(), ia)
return false
Expand Down Expand Up @@ -424,6 +425,7 @@ type configWatcher struct {
nsLocks *xsync.MapOf[string, *sync.RWMutex]
blacklistedPods *xsync.MapOf[string, time.Time]
startedAt time.Time
running atomic.Bool
rolloutDisabled bool

cms []cache.SharedIndexInformer
Expand Down Expand Up @@ -538,6 +540,7 @@ func (c *configWatcher) SetSelf(self Map) {
}

func (c *configWatcher) StartWatchers(ctx context.Context) error {
defer c.running.Store(true)
c.startedAt = time.Now()
ctx, c.cancel = context.WithCancel(ctx)
for _, si := range c.svs {
Expand Down Expand Up @@ -857,36 +860,38 @@ func (c *configWatcher) Start(ctx context.Context) {
}

func (c *configWatcher) DeleteMapsAndRolloutAll(ctx context.Context) {
c.cancel() // No more updates from watcher
now := meta.NewDeleteOptions(0)
api := k8sapi.GetK8sInterface(ctx).CoreV1()
c.nsLocks.Range(func(ns string, lock *sync.RWMutex) bool {
lock.Lock()
defer lock.Unlock()
wlm, err := data(ctx, ns)
if err != nil {
dlog.Errorf(ctx, "unable to get configmap %s.%s: %v", agentconfig.ConfigMap, ns, err)
return true
}
for k, v := range wlm {
e := &entry{name: k, namespace: ns, value: v}
scx, wl, err := e.workload(ctx)
if c.running.CompareAndSwap(true, false) {
c.cancel() // No more updates from watcher
now := meta.NewDeleteOptions(0)
api := k8sapi.GetK8sInterface(ctx).CoreV1()
c.nsLocks.Range(func(ns string, lock *sync.RWMutex) bool {
lock.Lock()
defer lock.Unlock()
wlm, err := data(ctx, ns)
if err != nil {
if !errors.IsNotFound(err) {
dlog.Errorf(ctx, "unable to get workload for %s.%s %s: %v", k, ns, v, err)
dlog.Errorf(ctx, "unable to get configmap %s.%s: %v", agentconfig.ConfigMap, ns, err)
return true
}
for k, v := range wlm {
e := &entry{name: k, namespace: ns, value: v}
scx, wl, err := e.workload(ctx)
if err != nil {
if !errors.IsNotFound(err) {
dlog.Errorf(ctx, "unable to get workload for %s.%s %s: %v", k, ns, v, err)
}
continue
}
continue
ac := scx.AgentConfig()
if ac.Create || ac.Manual {
// Deleted before it was generated or manually added, just ignore
continue
}
c.triggerRollout(ctx, wl, nil)
}
ac := scx.AgentConfig()
if ac.Create || ac.Manual {
// Deleted before it was generated or manually added, just ignore
continue
if err := api.ConfigMaps(ns).Delete(ctx, agentconfig.ConfigMap, *now); err != nil {
dlog.Errorf(ctx, "unable to delete ConfigMap %s-%s: %v", agentconfig.ConfigMap, ns, err)
}
c.triggerRollout(ctx, wl, nil)
}
if err := api.ConfigMaps(ns).Delete(ctx, agentconfig.ConfigMap, *now); err != nil {
dlog.Errorf(ctx, "unable to delete ConfigMap %s-%s: %v", agentconfig.ConfigMap, ns, err)
}
return true
})
return true
})
}
}

0 comments on commit 8ecec22

Please sign in to comment.