Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reload Configuration only when a pod matching the label set in mounted-file is created or deleted #347

Merged
merged 4 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 104 additions & 20 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package datasource
import (
"context"
"fmt"
"github.com/vmware/kube-fluentd-operator/config-reloader/fluentd"
"github.com/vmware/kube-fluentd-operator/config-reloader/util"
"os"
"sort"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -25,14 +28,16 @@ import (
)

type kubeInformerConnection struct {
client kubernetes.Interface
hashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
client kubernetes.Interface
confHashes map[string]string
mountedLabels map[string][]map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
updateChan chan time.Time
}

// NewKubernetesInformerDatasource builds a new Datasource from the provided config.
Expand Down Expand Up @@ -101,16 +106,31 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
}
logrus.Infof("Synced local informer with upstream Kubernetes API")

return &kubeInformerConnection{
client: client,
hashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
}, nil
kubeInfoCx := &kubeInformerConnection{
client: client,
confHashes: make(map[string]string),
mountedLabels: make(map[string][]map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
updateChan: updateChan,
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
UpdateFunc: func(old, obj interface{}) {
},
DeleteFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
})

return kubeInfoCx, nil
}

// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
Expand All @@ -136,6 +156,26 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
return nil, err
}

fragment, err := fluentd.ParseString(configdata)
if err != nil {
return nil, err
}

var mountedLabels []map[string]string
for _, frag := range fragment {
if frag.Name == "source" && frag.Type() == "mounted-file" {
paramLabels := frag.Param("labels")
paramLabels = util.TrimTrailingComment(paramLabels)
currLabels, err := util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels))
if err != nil {
return nil, err
}
mountedLabels = append(mountedLabels, currLabels)
}
}

d.updateMountedLabels(ns, mountedLabels)

// Create a compact representation of the pods running in the namespace
// under consideration
pods, err := d.podlist.Pods(ns).List(labels.NewSelector())
Expand All @@ -155,7 +195,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
nsconfigs = append(nsconfigs, &NamespaceConfig{
Name: ns,
FluentdConfig: configdata,
PreviousConfigHash: d.hashes[ns],
PreviousConfigHash: d.confHashes[ns],
Labels: nsobj.Labels,
MiniContainers: minis,
})
Expand All @@ -166,7 +206,11 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac

// WriteCurrentConfigHash is a setter for the hashtable maintained by this Datasource
func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

func (d *kubeInformerConnection) updateMountedLabels(namespace string, labels []map[string]string) {
d.mountedLabels[namespace] = labels
}

// UpdateStatus updates a namespace's status annotation with the latest result
Expand Down Expand Up @@ -239,6 +283,13 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
for _, cfmap := range confMapsList {
if cfmap.ObjectMeta.Name == d.cfg.DefaultConfigmapName {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
} else {
// We need to find configmaps that honor the global annotation for configmaps:
configMapNamespace, _ := d.nslist.Get(cfmap.ObjectMeta.Namespace)
configMapName := configMapNamespace.Annotations[d.cfg.AnnotConfigmapName]
if configMapName != "" {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
}
}
}
if d.cfg.CRDMigrationMode {
Expand Down Expand Up @@ -275,6 +326,39 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
return nsList, nil
}

func (d *kubeInformerConnection) handlePodChange(ctx context.Context, obj interface{}) {
mObj := obj.(*core.Pod)
logrus.Infof("Detected pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
configdata, err := d.kubeds.GetFluentdConfig(ctx, mObj.GetNamespace())
nsConfigStr := fmt.Sprintf("%#v", configdata)

if err == nil {
if strings.Contains(nsConfigStr, "mounted-file") {
podLabels := mObj.GetLabels()
mountedLabel := d.mountedLabels[mObj.GetNamespace()]
for _, container := range mObj.Spec.Containers {
if matchAny(podLabels, mountedLabel, container.Name) {
logrus.Infof("Detected mounted-file pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
select {
case d.updateChan <- time.Now():
default:
}
}
}
}
}
}

func matchAny(contLabels map[string]string, mountedLabelsInNs []map[string]string, name string) bool {
for _, mountedLabels := range mountedLabelsInNs {
if util.Match(mountedLabels, contLabels, name) {
return true
}
}

return false
}

func (d *kubeInformerConnection) discoverFluentdConfigNamespaces() ([]string, error) {
if d.fdlist == nil {
return nil, fmt.Errorf("Failed to initialize the fluentdconfig crd client, d.fclient = nil")
Expand Down
69 changes: 7 additions & 62 deletions config-reloader/processors/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package processors

import (
"bytes"
"errors"
"fmt"
"reflect"
"regexp"
Expand All @@ -16,11 +15,6 @@ import (
"github.com/vmware/kube-fluentd-operator/config-reloader/util"
)

const (
macroLabels = "$labels"
containerLabel = "_container"
)

type expandLabelsMacroState struct {
BaseProcessorState
}
Expand All @@ -33,9 +27,6 @@ var reSafe = regexp.MustCompile(`[.-]|^$`)
// an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is
// '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?'

var reValidLabelName = regexp.MustCompile(`^([A-Za-z0-9][-A-Za-z0-9\/_.]*)?[A-Za-z0-9]$`)
var reValidLabelValue = regexp.MustCompile(`^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$`)

var fns = template.FuncMap{
"last": func(x int, a interface{}) bool {
return x == reflect.ValueOf(a).Len()-1
Expand Down Expand Up @@ -67,56 +58,10 @@ var retagTemplate = template.Must(template.New("retagTemplate").Funcs(fns).Parse
</filter>
`))

func parseTagToLabels(tag string) (map[string]string, error) {
if !strings.HasPrefix(tag, macroLabels+"(") &&
!strings.HasSuffix(tag, ")") {
return nil, fmt.Errorf("bad $labels macro use: %s", tag)
}

labelsOnly := tag[len(macroLabels)+1 : len(tag)-1]

result := map[string]string{}

records := strings.Split(labelsOnly, ",")
for _, rec := range records {
if rec == "" {
// be generous
continue
}
kv := strings.Split(rec, "=")
if len(kv) != 2 {
return nil, fmt.Errorf("bad label definition: %s", kv)
}

k := util.Trim(kv[0])
if k != containerLabel {
if !reValidLabelName.MatchString(k) {
return nil, fmt.Errorf("bad label name: %s", k)
}
}

v := util.Trim(kv[1])
if !reValidLabelValue.MatchString(v) {
return nil, fmt.Errorf("bad label value: %s", v)
}
if k == containerLabel && v == "" {
return nil, fmt.Errorf("value for %s cannot be empty string", containerLabel)
}

result[k] = v
}

if len(result) == 0 {
return nil, errors.New("at least one label must be given")
}

return result, nil
}

func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[string]string) string {
buf := &bytes.Buffer{}

if cont, ok := labelNames[containerLabel]; ok {
if cont, ok := labelNames[util.ContainerLabel]; ok {
// if the special label _container is used then its name goes to the
// part of the tag that denotes the container
buf.WriteString(fmt.Sprintf("kube.%s.*.%s._labels.", ns, cont))
Expand All @@ -125,7 +70,7 @@ func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[stri
}

for i, lb := range sortedLabelNames {
if lb == containerLabel {
if lb == util.ContainerLabel {
continue
}

Expand Down Expand Up @@ -157,11 +102,11 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme
return nil
}

if !strings.HasPrefix(d.Tag, macroLabels) {
if !strings.HasPrefix(d.Tag, util.MacroLabels) {
return nil
}

labelNames, err := parseTagToLabels(d.Tag)
labelNames, err := util.ParseTagToLabels(d.Tag)
if err != nil {
return err
}
Expand All @@ -180,19 +125,19 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme
return input, nil
}

delete(allReferencedLabels, containerLabel)
delete(allReferencedLabels, util.ContainerLabel)
sortedLabelNames := util.SortedKeys(allReferencedLabels)

replaceLabels := func(d *fluentd.Directive, ctx *ProcessorContext) error {
if d.Name != "filter" && d.Name != "match" {
return nil
}

if !strings.HasPrefix(d.Tag, macroLabels) {
if !strings.HasPrefix(d.Tag, util.MacroLabels) {
return nil
}

labelNames, err := parseTagToLabels(d.Tag)
labelNames, err := util.ParseTagToLabels(d.Tag)
if err != nil {
// should never happen as the error should be caught beforehand
return nil
Expand Down
42 changes: 0 additions & 42 deletions config-reloader/processors/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestLabelsParseOk(t *testing.T) {
inputs := map[string]map[string]string{
"$labels(a=b,,,)": {"a": "b"},
"$labels(a=1, b=2)": {"a": "1", "b": "2"},
"$labels(x=y,b=1)": {"b": "1", "x": "y"},
"$labels(x=1, b = 1)": {"b": "1", "x": "1"},
"$labels(x=1, a=)": {"a": "", "x": "1"},
"$labels(hello/world=ok, a=value)": {"hello/world": "ok", "a": "value"},
"$labels(x=1, _container=main)": {"_container": "main", "x": "1"},
}

for tag, result := range inputs {
processed, err := parseTagToLabels(tag)
assert.Nil(t, err, "Got an error instead: %+v", err)
assert.Equal(t, result, processed)
}
}

func TestSafeLabel(t *testing.T) {
// empty string is a valid label value
assert.Equal(t, "_", safeLabelValue(""))
Expand All @@ -41,30 +23,6 @@ func TestSafeLabel(t *testing.T) {
assert.Equal(t, "app_kubernetes_io/name=nginx_ingress", safeLabelValue("app.kubernetes.io/name=nginx-ingress"))
}

func TestLabelsParseNotOk(t *testing.T) {
inputs := []string{
"$labels",
"$labels()",
"$labels(=)",
"$labels(=f)",
"$labels(.=*)",
"$labels(a=.)",
"$labels(a==1)",
"$labels(-a=sfd)",
"$labels(a=-sfd)",
"$labels(a*=hello)",
"$labels(a=*)",
"$labels(a=1, =2)",
"$labels(_container=)", // empty container name
"$labels(app.kubernetes.io/name=*)",
}

for _, tag := range inputs {
res, err := parseTagToLabels(tag)
assert.NotNil(t, err, "Got this instead for %s: %+v", tag, res)
}
}

func TestLabelNoLabels(t *testing.T) {
s := `
<filter **>
Expand Down
Loading