diff --git a/config-reloader/datasource/kube_informer.go b/config-reloader/datasource/kube_informer.go index 084ead3a..261fb7e4 100644 --- a/config-reloader/datasource/kube_informer.go +++ b/config-reloader/datasource/kube_informer.go @@ -3,8 +3,11 @@ package datasource import ( "context" "fmt" + "github.com/vmware/kube-fluentd-operator/config-reloader/fluentd" + "github.com/vmware/kube-fluentd-operator/config-reloader/util" "os" "sort" + "strings" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -25,14 +28,16 @@ import ( ) type kubeInformerConnection struct { - client kubernetes.Interface - hashes map[string]string - cfg *config.Config - kubeds kubedatasource.KubeDS - nslist listerv1.NamespaceLister - podlist listerv1.PodLister - cmlist listerv1.ConfigMapLister - fdlist kfoListersV1beta1.FluentdConfigLister + client kubernetes.Interface + confHashes map[string]string + mountedLabels map[string][]map[string]string + cfg *config.Config + kubeds kubedatasource.KubeDS + nslist listerv1.NamespaceLister + podlist listerv1.PodLister + cmlist listerv1.ConfigMapLister + fdlist kfoListersV1beta1.FluentdConfigLister + updateChan chan time.Time } // NewKubernetesInformerDatasource builds a new Datasource from the provided config. @@ -101,16 +106,31 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up } logrus.Infof("Synced local informer with upstream Kubernetes API") - return &kubeInformerConnection{ - client: client, - hashes: make(map[string]string), - cfg: cfg, - kubeds: kubeds, - nslist: namespaceLister, - podlist: podLister, - cmlist: cmLister, - fdlist: fluentdconfigDSLister.Fdlist, - }, nil + kubeInfoCx := &kubeInformerConnection{ + client: client, + confHashes: make(map[string]string), + mountedLabels: make(map[string][]map[string]string), + cfg: cfg, + kubeds: kubeds, + nslist: namespaceLister, + podlist: podLister, + cmlist: cmLister, + fdlist: fluentdconfigDSLister.Fdlist, + updateChan: updateChan, + } + + factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + kubeInfoCx.handlePodChange(ctx, obj) + }, + UpdateFunc: func(old, obj interface{}) { + }, + DeleteFunc: func(obj interface{}) { + kubeInfoCx.handlePodChange(ctx, obj) + }, + }) + + return kubeInfoCx, nil } // GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects. @@ -136,6 +156,26 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac return nil, err } + fragment, err := fluentd.ParseString(configdata) + if err != nil { + return nil, err + } + + var mountedLabels []map[string]string + for _, frag := range fragment { + if frag.Name == "source" && frag.Type() == "mounted-file" { + paramLabels := frag.Param("labels") + paramLabels = util.TrimTrailingComment(paramLabels) + currLabels, err := util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels)) + if err != nil { + return nil, err + } + mountedLabels = append(mountedLabels, currLabels) + } + } + + d.updateMountedLabels(ns, mountedLabels) + // Create a compact representation of the pods running in the namespace // under consideration pods, err := d.podlist.Pods(ns).List(labels.NewSelector()) @@ -155,7 +195,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac nsconfigs = append(nsconfigs, &NamespaceConfig{ Name: ns, FluentdConfig: configdata, - PreviousConfigHash: d.hashes[ns], + PreviousConfigHash: d.confHashes[ns], Labels: nsobj.Labels, MiniContainers: minis, }) @@ -166,7 +206,11 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac // WriteCurrentConfigHash is a setter for the hashtable maintained by this Datasource func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash string) { - d.hashes[namespace] = hash + d.confHashes[namespace] = hash +} + +func (d *kubeInformerConnection) updateMountedLabels(namespace string, labels []map[string]string) { + d.mountedLabels[namespace] = labels } // UpdateStatus updates a namespace's status annotation with the latest result @@ -239,6 +283,13 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri for _, cfmap := range confMapsList { if cfmap.ObjectMeta.Name == d.cfg.DefaultConfigmapName { namespaces = append(namespaces, cfmap.ObjectMeta.Namespace) + } else { + // We need to find configmaps that honor the global annotation for configmaps: + configMapNamespace, _ := d.nslist.Get(cfmap.ObjectMeta.Namespace) + configMapName := configMapNamespace.Annotations[d.cfg.AnnotConfigmapName] + if configMapName != "" { + namespaces = append(namespaces, cfmap.ObjectMeta.Namespace) + } } } if d.cfg.CRDMigrationMode { @@ -275,6 +326,39 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri return nsList, nil } +func (d *kubeInformerConnection) handlePodChange(ctx context.Context, obj interface{}) { + mObj := obj.(*core.Pod) + logrus.Infof("Detected pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace()) + configdata, err := d.kubeds.GetFluentdConfig(ctx, mObj.GetNamespace()) + nsConfigStr := fmt.Sprintf("%#v", configdata) + + if err == nil { + if strings.Contains(nsConfigStr, "mounted-file") { + podLabels := mObj.GetLabels() + mountedLabel := d.mountedLabels[mObj.GetNamespace()] + for _, container := range mObj.Spec.Containers { + if matchAny(podLabels, mountedLabel, container.Name) { + logrus.Infof("Detected mounted-file pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace()) + select { + case d.updateChan <- time.Now(): + default: + } + } + } + } + } +} + +func matchAny(contLabels map[string]string, mountedLabelsInNs []map[string]string, name string) bool { + for _, mountedLabels := range mountedLabelsInNs { + if util.Match(mountedLabels, contLabels, name) { + return true + } + } + + return false +} + func (d *kubeInformerConnection) discoverFluentdConfigNamespaces() ([]string, error) { if d.fdlist == nil { return nil, fmt.Errorf("Failed to initialize the fluentdconfig crd client, d.fclient = nil") diff --git a/config-reloader/processors/labels.go b/config-reloader/processors/labels.go index 1b37944e..02bc6441 100644 --- a/config-reloader/processors/labels.go +++ b/config-reloader/processors/labels.go @@ -5,7 +5,6 @@ package processors import ( "bytes" - "errors" "fmt" "reflect" "regexp" @@ -16,11 +15,6 @@ import ( "github.com/vmware/kube-fluentd-operator/config-reloader/util" ) -const ( - macroLabels = "$labels" - containerLabel = "_container" -) - type expandLabelsMacroState struct { BaseProcessorState } @@ -33,9 +27,6 @@ var reSafe = regexp.MustCompile(`[.-]|^$`) // an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is // '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?' -var reValidLabelName = regexp.MustCompile(`^([A-Za-z0-9][-A-Za-z0-9\/_.]*)?[A-Za-z0-9]$`) -var reValidLabelValue = regexp.MustCompile(`^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$`) - var fns = template.FuncMap{ "last": func(x int, a interface{}) bool { return x == reflect.ValueOf(a).Len()-1 @@ -67,56 +58,10 @@ var retagTemplate = template.Must(template.New("retagTemplate").Funcs(fns).Parse `)) -func parseTagToLabels(tag string) (map[string]string, error) { - if !strings.HasPrefix(tag, macroLabels+"(") && - !strings.HasSuffix(tag, ")") { - return nil, fmt.Errorf("bad $labels macro use: %s", tag) - } - - labelsOnly := tag[len(macroLabels)+1 : len(tag)-1] - - result := map[string]string{} - - records := strings.Split(labelsOnly, ",") - for _, rec := range records { - if rec == "" { - // be generous - continue - } - kv := strings.Split(rec, "=") - if len(kv) != 2 { - return nil, fmt.Errorf("bad label definition: %s", kv) - } - - k := util.Trim(kv[0]) - if k != containerLabel { - if !reValidLabelName.MatchString(k) { - return nil, fmt.Errorf("bad label name: %s", k) - } - } - - v := util.Trim(kv[1]) - if !reValidLabelValue.MatchString(v) { - return nil, fmt.Errorf("bad label value: %s", v) - } - if k == containerLabel && v == "" { - return nil, fmt.Errorf("value for %s cannot be empty string", containerLabel) - } - - result[k] = v - } - - if len(result) == 0 { - return nil, errors.New("at least one label must be given") - } - - return result, nil -} - func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[string]string) string { buf := &bytes.Buffer{} - if cont, ok := labelNames[containerLabel]; ok { + if cont, ok := labelNames[util.ContainerLabel]; ok { // if the special label _container is used then its name goes to the // part of the tag that denotes the container buf.WriteString(fmt.Sprintf("kube.%s.*.%s._labels.", ns, cont)) @@ -125,7 +70,7 @@ func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[stri } for i, lb := range sortedLabelNames { - if lb == containerLabel { + if lb == util.ContainerLabel { continue } @@ -157,11 +102,11 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme return nil } - if !strings.HasPrefix(d.Tag, macroLabels) { + if !strings.HasPrefix(d.Tag, util.MacroLabels) { return nil } - labelNames, err := parseTagToLabels(d.Tag) + labelNames, err := util.ParseTagToLabels(d.Tag) if err != nil { return err } @@ -180,7 +125,7 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme return input, nil } - delete(allReferencedLabels, containerLabel) + delete(allReferencedLabels, util.ContainerLabel) sortedLabelNames := util.SortedKeys(allReferencedLabels) replaceLabels := func(d *fluentd.Directive, ctx *ProcessorContext) error { @@ -188,11 +133,11 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme return nil } - if !strings.HasPrefix(d.Tag, macroLabels) { + if !strings.HasPrefix(d.Tag, util.MacroLabels) { return nil } - labelNames, err := parseTagToLabels(d.Tag) + labelNames, err := util.ParseTagToLabels(d.Tag) if err != nil { // should never happen as the error should be caught beforehand return nil diff --git a/config-reloader/processors/labels_test.go b/config-reloader/processors/labels_test.go index ae422e4e..39a9eed6 100644 --- a/config-reloader/processors/labels_test.go +++ b/config-reloader/processors/labels_test.go @@ -12,24 +12,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestLabelsParseOk(t *testing.T) { - inputs := map[string]map[string]string{ - "$labels(a=b,,,)": {"a": "b"}, - "$labels(a=1, b=2)": {"a": "1", "b": "2"}, - "$labels(x=y,b=1)": {"b": "1", "x": "y"}, - "$labels(x=1, b = 1)": {"b": "1", "x": "1"}, - "$labels(x=1, a=)": {"a": "", "x": "1"}, - "$labels(hello/world=ok, a=value)": {"hello/world": "ok", "a": "value"}, - "$labels(x=1, _container=main)": {"_container": "main", "x": "1"}, - } - - for tag, result := range inputs { - processed, err := parseTagToLabels(tag) - assert.Nil(t, err, "Got an error instead: %+v", err) - assert.Equal(t, result, processed) - } -} - func TestSafeLabel(t *testing.T) { // empty string is a valid label value assert.Equal(t, "_", safeLabelValue("")) @@ -41,30 +23,6 @@ func TestSafeLabel(t *testing.T) { assert.Equal(t, "app_kubernetes_io/name=nginx_ingress", safeLabelValue("app.kubernetes.io/name=nginx-ingress")) } -func TestLabelsParseNotOk(t *testing.T) { - inputs := []string{ - "$labels", - "$labels()", - "$labels(=)", - "$labels(=f)", - "$labels(.=*)", - "$labels(a=.)", - "$labels(a==1)", - "$labels(-a=sfd)", - "$labels(a=-sfd)", - "$labels(a*=hello)", - "$labels(a=*)", - "$labels(a=1, =2)", - "$labels(_container=)", // empty container name - "$labels(app.kubernetes.io/name=*)", - } - - for _, tag := range inputs { - res, err := parseTagToLabels(tag) - assert.NotNil(t, err, "Got this instead for %s: %+v", tag, res) - } -} - func TestLabelNoLabels(t *testing.T) { s := ` diff --git a/config-reloader/processors/mounted_file.go b/config-reloader/processors/mounted_file.go index a3e7adfd..351d3ff9 100644 --- a/config-reloader/processors/mounted_file.go +++ b/config-reloader/processors/mounted_file.go @@ -43,7 +43,7 @@ func (state *mountedFileState) Prepare(input fluentd.Fragment) (fluentd.Fragment } paramLabels = util.TrimTrailingComment(paramLabels) - labels, err := parseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels)) + labels, err := util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels)) if err != nil { return nil, err } @@ -53,7 +53,7 @@ func (state *mountedFileState) Prepare(input fluentd.Fragment) (fluentd.Fragment var addedLabels map[string]string if paramAddedLabels != "" { // no added labels is just fine - addedLabels, err = parseTagToLabels(fmt.Sprintf("$labels(%s)", paramAddedLabels)) + addedLabels, err = util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramAddedLabels)) if err != nil { return nil, err } @@ -86,18 +86,7 @@ func (state *mountedFileState) Prepare(input fluentd.Fragment) (fluentd.Fragment } func matches(spec *ContainerFile, mini *datasource.MiniContainer) bool { - for k, v := range spec.Labels { - contValue := mini.Labels[k] - if k == "_container" { - contValue = mini.Name - } - - if v != contValue { - return false - } - } - - return true + return util.Match(spec.Labels, mini.Labels, mini.Name) } func (state *mountedFileState) convertToFragement(cf *ContainerFile) fluentd.Fragment { diff --git a/config-reloader/processors/thisns.go b/config-reloader/processors/thisns.go index abf1286f..d9e80d07 100644 --- a/config-reloader/processors/thisns.go +++ b/config-reloader/processors/thisns.go @@ -5,6 +5,7 @@ package processors import ( "fmt" + "github.com/vmware/kube-fluentd-operator/config-reloader/util" "strings" "github.com/vmware/kube-fluentd-operator/config-reloader/fluentd" @@ -42,7 +43,7 @@ func (p *expandThisnsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme return nil } - if strings.HasPrefix(d.Tag, macroLabels) || strings.HasPrefix(d.Tag, macroUniqueTag) { + if strings.HasPrefix(d.Tag, util.MacroLabels) || strings.HasPrefix(d.Tag, macroUniqueTag) { // Let other processors handle this return nil } diff --git a/config-reloader/util/util.go b/config-reloader/util/util.go index 2abc6b6c..fe606f9b 100644 --- a/config-reloader/util/util.go +++ b/config-reloader/util/util.go @@ -7,10 +7,12 @@ import ( "bytes" "crypto/sha256" "encoding/hex" + "errors" "fmt" "io/ioutil" "os" "os/exec" + "regexp" "sort" "strings" "text/template" @@ -21,10 +23,15 @@ import ( ) const ( - maskFile = 0664 - maskDirectory = 0775 + maskFile = 0664 + maskDirectory = 0775 + MacroLabels = "$labels" + ContainerLabel = "_container" ) +var reValidLabelName = regexp.MustCompile(`^([A-Za-z0-9][-A-Za-z0-9\/_.]*)?[A-Za-z0-9]$`) +var reValidLabelValue = regexp.MustCompile(`^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$`) + func Trim(s string) string { return strings.TrimSpace(s) } @@ -129,6 +136,66 @@ func TrimTrailingComment(line string) string { return line } +func ParseTagToLabels(tag string) (map[string]string, error) { + if !strings.HasPrefix(tag, MacroLabels+"(") && + !strings.HasSuffix(tag, ")") { + return nil, fmt.Errorf("bad $labels macro use: %s", tag) + } + + labelsOnly := tag[len(MacroLabels)+1 : len(tag)-1] + + result := map[string]string{} + + records := strings.Split(labelsOnly, ",") + for _, rec := range records { + if rec == "" { + // be generous + continue + } + kv := strings.Split(rec, "=") + if len(kv) != 2 { + return nil, fmt.Errorf("bad label definition: %s", kv) + } + + k := Trim(kv[0]) + if k != ContainerLabel { + if !reValidLabelName.MatchString(k) { + return nil, fmt.Errorf("bad label name: %s", k) + } + } + + v := Trim(kv[1]) + if !reValidLabelValue.MatchString(v) { + return nil, fmt.Errorf("bad label value: %s", v) + } + if k == ContainerLabel && v == "" { + return nil, fmt.Errorf("value for %s cannot be empty string", ContainerLabel) + } + + result[k] = v + } + + if len(result) == 0 { + return nil, errors.New("at least one label must be given") + } + + return result, nil +} + +func Match(labels map[string]string, contLabels map[string]string, contName string) bool { + for k, v := range labels { + value := contLabels[k] + if k == "_container" { + value = contName + } + + if v != value { + return false + } + } + return true +} + func EnsureDirExists(dir string) error { if _, err := os.Stat(dir); os.IsNotExist(err) { err = os.Mkdir(dir, maskDirectory) diff --git a/config-reloader/util/util_test.go b/config-reloader/util/util_test.go index 523995a4..e5dde9ef 100644 --- a/config-reloader/util/util_test.go +++ b/config-reloader/util/util_test.go @@ -43,6 +43,71 @@ func TestTrimTrailingComment(t *testing.T) { assert.Equal(t, "a", TrimTrailingComment("a#########")) } +func TestLabelsParseOk(t *testing.T) { + inputs := map[string]map[string]string{ + "$labels(a=b,,,)": {"a": "b"}, + "$labels(a=1, b=2)": {"a": "1", "b": "2"}, + "$labels(x=y,b=1)": {"b": "1", "x": "y"}, + "$labels(x=1, b = 1)": {"b": "1", "x": "1"}, + "$labels(x=1, a=)": {"a": "", "x": "1"}, + "$labels(hello/world=ok, a=value)": {"hello/world": "ok", "a": "value"}, + "$labels(x=1, _container=main)": {"_container": "main", "x": "1"}, + } + + for tag, result := range inputs { + processed, err := ParseTagToLabels(tag) + assert.Nil(t, err, "Got an error instead: %+v", err) + assert.Equal(t, result, processed) + } +} + +func TestLabelsParseNotOk(t *testing.T) { + inputs := []string{ + "$labels", + "$labels()", + "$labels(=)", + "$labels(=f)", + "$labels(.=*)", + "$labels(a=.)", + "$labels(a==1)", + "$labels(-a=sfd)", + "$labels(a=-sfd)", + "$labels(a*=hello)", + "$labels(a=*)", + "$labels(a=1, =2)", + "$labels(_container=)", // empty container name + "$labels(app.kubernetes.io/name=*)", + } + + for _, tag := range inputs { + res, err := ParseTagToLabels(tag) + assert.NotNil(t, err, "Got this instead for %s: %+v", tag, res) + } +} + +func TestMatch(t *testing.T) { + containerLabels := map[string]string{"key": "value"} + containerName := "container-name" + + var labels map[string]string = nil + assert.True(t, Match(labels, containerLabels, containerName)) + + labels = map[string]string{"_container": containerName} + assert.True(t, Match(labels, containerLabels, containerName)) + + labels = map[string]string{"a": "a"} + assert.False(t, Match(labels, containerLabels, containerName)) + + labels = map[string]string{"key": "value"} + assert.True(t, Match(labels, containerLabels, containerName)) + + labels = map[string]string{"key": "value", "_container": "container-name"} + assert.True(t, Match(labels, containerLabels, containerName)) + + labels = map[string]string{"a": "a", "key": "value", "_container": "container-name"} + assert.False(t, Match(labels, containerLabels, containerName)) +} + func TestEnsureDirExits(t *testing.T) { type testDirConfig struct { @@ -71,5 +136,4 @@ func TestEnsureDirExits(t *testing.T) { os.Remove(config.folderName) } } - }