From 81b51438add2c17d27b2630a80cd936a59766a11 Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Thu, 9 May 2024 12:15:47 +0800 Subject: [PATCH] :seedling: Persistent the agent event state to the configmap (#899) * Add time filter Signed-off-by: myan * add more tests Signed-off-by: myan * format Signed-off-by: Meng Yan * refactor test Signed-off-by: myan * fix sonar Signed-off-by: myan * reply review Signed-off-by: myan * use creattionTimestamp Signed-off-by: myan --------- Signed-off-by: myan Signed-off-by: Meng Yan --- agent/pkg/status/controller/controller.go | 6 + .../event/event_integration_test.go | 135 +--------------- .../event/local_replicated_policy_emitter.go | 24 +-- .../local_replicated_policy_emitter_test.go | 95 ++++++++++++ .../event/local_root_policy_emitter.go | 32 ++-- .../event/local_root_policy_emitter_test.go | 123 +++++++++++++++ .../status/controller/filter/time_filter.go | 144 ++++++++++++++++++ .../controller/filter/time_filter_test.go | 136 +++++++++++++++++ .../generic/generic_event_syncer.go | 1 + .../generic/generic_object_syncer.go | 1 + .../policies/status_event_emitter.go | 20 ++- 11 files changed, 551 insertions(+), 166 deletions(-) create mode 100644 agent/pkg/status/controller/event/local_replicated_policy_emitter_test.go create mode 100644 agent/pkg/status/controller/event/local_root_policy_emitter_test.go create mode 100644 agent/pkg/status/controller/filter/time_filter.go create mode 100644 agent/pkg/status/controller/filter/time_filter_test.go diff --git a/agent/pkg/status/controller/controller.go b/agent/pkg/status/controller/controller.go index 2bd1e15d4..3540973d9 100644 --- a/agent/pkg/status/controller/controller.go +++ b/agent/pkg/status/controller/controller.go @@ -13,6 +13,7 @@ import ( "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/apps" agentstatusconfig "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/config" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/event" + "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/filter" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/hubcluster" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/managedclusters" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/placement" @@ -77,5 +78,10 @@ func AddControllers(ctx context.Context, mgr ctrl.Manager, agentConfig *config.A if err := apps.LaunchSubscriptionReportSyncer(ctx, mgr, agentConfig, producer); err != nil { return fmt.Errorf("failed to launch subscription report syncer: %w", err) } + + // lunch a time filter, it must be called after filter.RegisterTimeFilter(key) + if err := filter.LaunchTimeFilter(ctx, mgr.GetClient(), agentConfig.PodNameSpace); err != nil { + return fmt.Errorf("failed to launch time filter: %w", err) + } return nil } diff --git a/agent/pkg/status/controller/event/event_integration_test.go b/agent/pkg/status/controller/event/event_integration_test.go index 198b20f22..696f70420 100644 --- a/agent/pkg/status/controller/event/event_integration_test.go +++ b/agent/pkg/status/controller/event/event_integration_test.go @@ -1,139 +1,10 @@ package event import ( - "encoding/json" - "fmt" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clusterv1 "open-cluster-management.io/api/cluster/v1" - policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" - "github.com/stolostron/multicluster-global-hub/pkg/constants" - "github.com/stolostron/multicluster-global-hub/pkg/enum" ) -var _ = Describe("test the policy emitter", Ordered, func() { - It("should pass the root policy event", func() { - By("Creating a root policy") - rootPolicy := &policyv1.Policy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "policy1", - Namespace: "default", - Finalizers: []string{constants.GlobalHubCleanupFinalizer}, - }, - Spec: policyv1.PolicySpec{ - Disabled: true, - PolicyTemplates: []*policyv1.PolicyTemplate{}, - }, - Status: policyv1.PolicyStatus{ - ComplianceState: policyv1.Compliant, - }, - } - Expect(kubeClient.Create(ctx, rootPolicy)).NotTo(HaveOccurred()) - - evt := &corev1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Name: "policy1.123r543243242", - Namespace: "default", - }, - InvolvedObject: v1.ObjectReference{ - Kind: string(policyv1.Kind), - Namespace: "default", - Name: "policy1", - }, - Reason: "PolicyPropagation", - Message: "Policy default/policy1 was propagated to cluster1", - Source: corev1.EventSource{ - Component: "policy-propagator", - }, - } - Expect(kubeClient.Create(ctx, evt)).NotTo(HaveOccurred()) - - receivedEvent := <-consumer.EventChan() - fmt.Sprintln(receivedEvent) - Expect(string(enum.LocalRootPolicyEventType)).To(Equal(receivedEvent.Type())) - - rootPolicyEvents := []event.RootPolicyEvent{} - err := json.Unmarshal(receivedEvent.Data(), &rootPolicyEvents) - Expect(err).Should(Succeed()) - Expect(rootPolicyEvents[0].EventName).To(Equal(evt.Name)) - }) - - It("should pass the replicated policy event", func() { - By("Create namespace and cluster for the replicated policy") - err := kubeClient.Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster1", - }, - }, &client.CreateOptions{}) - Expect(err).Should(Succeed()) - - By("Create the cluster") - cluster := &clusterv1.ManagedCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster1", - }, - } - Expect(kubeClient.Create(ctx, cluster, &client.CreateOptions{})).Should(Succeed()) - cluster.Status = clusterv1.ManagedClusterStatus{ - ClusterClaims: []clusterv1.ManagedClusterClaim{ - { - Name: "id.k8s.io", - Value: "3f406177-34b2-4852-88dd-ff2809680336", - }, - }, - } - Expect(kubeClient.Status().Update(ctx, cluster)).Should(Succeed()) - - By("Create the replicated policy") - replicatedPolicy := &policyv1.Policy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "default.policy1", - Namespace: "cluster1", - Labels: map[string]string{ - constants.PolicyEventRootPolicyNameLabelKey: fmt.Sprintf("%s.%s", "default", "policy1"), - constants.PolicyEventClusterNameLabelKey: "cluster1", - }, - }, - Spec: policyv1.PolicySpec{ - Disabled: false, - PolicyTemplates: []*policyv1.PolicyTemplate{}, - }, - } - Expect(kubeClient.Create(ctx, replicatedPolicy)).ToNot(HaveOccurred()) - - By("Create the replicated policy event") - evt := &corev1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Name: "default.policy1.17af98f19c06811e", - Namespace: "cluster1", - }, - InvolvedObject: v1.ObjectReference{ - Kind: string(policyv1.Kind), - Namespace: "cluster1", - Name: "default.policy1", - }, - Reason: "PolicyStatusSync", - Message: "Policy default.policy1 status was updated in cluster", - Source: corev1.EventSource{ - Component: "policy-status-sync", - }, - } - Expect(kubeClient.Create(ctx, evt)).NotTo(HaveOccurred()) - - receivedEvent := <-consumer.EventChan() - fmt.Sprintln(receivedEvent) - Expect(string(enum.LocalReplicatedPolicyEventType)).To(Equal(receivedEvent.Type())) - - replicatedPolicyEvents := []event.ReplicatedPolicyEvent{} - err = json.Unmarshal(receivedEvent.Data(), &replicatedPolicyEvents) - Expect(err).Should(Succeed()) - Expect(replicatedPolicyEvents[0].EventName).To(Equal(evt.Name)) - }) +var _ = Describe("Events emitters", Ordered, func() { + Context("with root policy events", Ordered, localRootPolicyEventTestSpecs) + Context("with replicated policy events", Ordered, localReplicatedPolicyEventTestSpecs) }) diff --git a/agent/pkg/status/controller/event/local_replicated_policy_emitter.go b/agent/pkg/status/controller/event/local_replicated_policy_emitter.go index 6a72d1167..6a057201e 100644 --- a/agent/pkg/status/controller/event/local_replicated_policy_emitter.go +++ b/agent/pkg/status/controller/event/local_replicated_policy_emitter.go @@ -3,15 +3,16 @@ package event import ( "context" "fmt" + "strings" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-logr/logr" - lru "github.com/hashicorp/golang-lru" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/config" + "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/filter" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/generic" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/policies" "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" @@ -60,29 +61,30 @@ var _ generic.ObjectEmitter = &localReplicatedPolicyEmitter{} type localReplicatedPolicyEmitter struct { ctx context.Context + name string log logr.Logger eventType string runtimeClient client.Client currentVersion *version.Version lastSentVersion version.Version payload event.ReplicatedPolicyEventBundle - cache *lru.Cache topic string } func NewLocalReplicatedPolicyEmitter(ctx context.Context, runtimeClient client.Client, topic string, ) generic.ObjectEmitter { - cache, _ := lru.New(20) + name := strings.Replace(string(enum.LocalReplicatedPolicyEventType), enum.EventTypePrefix, "", -1) + filter.RegisterTimeFilter(name) return &localReplicatedPolicyEmitter{ ctx: ctx, - log: ctrl.Log.WithName("policy-event-syncer/replicated-policy"), + name: name, + log: ctrl.Log.WithName(name), eventType: string(enum.LocalReplicatedPolicyEventType), topic: topic, runtimeClient: runtimeClient, currentVersion: version.NewVersion(), lastSentVersion: *version.NewVersion(), - cache: cache, payload: make([]event.ReplicatedPolicyEvent, 0), } } @@ -96,7 +98,7 @@ func (h *localReplicatedPolicyEmitter) ShouldUpdate(obj client.Object) bool { if config.GetEnableLocalPolicy() != config.EnableLocalPolicyTrue { return false } - policy, ok := policyEventPredicate(h.ctx, obj, h.runtimeClient, h.log) + policy, ok := policyEventPredicate(h.ctx, h.name, obj, h.runtimeClient, h.log) return ok && !utils.HasAnnotation(policy, constants.OriginOwnerReferenceAnnotation) && utils.HasItemKey(policy.GetLabels(), constants.PolicyEventRootPolicyNameLabelKey) @@ -115,11 +117,6 @@ func (h *localReplicatedPolicyEmitter) Update(obj client.Object) bool { if !ok { return false } - // if exist, then return - evtKey := getEventKey(evt) - if h.cache.Contains(evtKey) { - return false - } // get policy policy, err := getInvolvePolicy(h.ctx, h.runtimeClient, evt) @@ -151,7 +148,6 @@ func (h *localReplicatedPolicyEmitter) Update(obj client.Object) bool { } // cache to events and update version h.payload = append(h.payload, replicatedPolicyEvent) - h.cache.Add(evtKey, nil) return true } @@ -173,6 +169,10 @@ func (h *localReplicatedPolicyEmitter) ToCloudEvent() (*cloudevents.Event, error } func (h *localReplicatedPolicyEmitter) PostSend() { + // update the time filter: with latest event + for _, evt := range h.payload { + filter.CacheTime(h.name, evt.CreatedAt.Time) + } // update version and clean the cache h.payload = make([]event.ReplicatedPolicyEvent, 0) h.currentVersion.Next() diff --git a/agent/pkg/status/controller/event/local_replicated_policy_emitter_test.go b/agent/pkg/status/controller/event/local_replicated_policy_emitter_test.go new file mode 100644 index 000000000..54fad001f --- /dev/null +++ b/agent/pkg/status/controller/event/local_replicated_policy_emitter_test.go @@ -0,0 +1,95 @@ +package event + +import ( + "encoding/json" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clusterv1 "open-cluster-management.io/api/cluster/v1" + policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" + "github.com/stolostron/multicluster-global-hub/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/enum" +) + +func localReplicatedPolicyEventTestSpecs() { + It("should pass the replicated policy event", func() { + By("Create namespace and cluster for the replicated policy") + err := kubeClient.Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster1", + }, + }, &client.CreateOptions{}) + Expect(err).Should(Succeed()) + + By("Create the cluster") + cluster := &clusterv1.ManagedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster1", + }, + } + Expect(kubeClient.Create(ctx, cluster, &client.CreateOptions{})).Should(Succeed()) + cluster.Status = clusterv1.ManagedClusterStatus{ + ClusterClaims: []clusterv1.ManagedClusterClaim{ + { + Name: "id.k8s.io", + Value: "3f406177-34b2-4852-88dd-ff2809680336", + }, + }, + } + Expect(kubeClient.Status().Update(ctx, cluster)).Should(Succeed()) + + By("Create the replicated policy") + replicatedPolicy := &policyv1.Policy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default.policy1", + Namespace: "cluster1", + Labels: map[string]string{ + constants.PolicyEventRootPolicyNameLabelKey: fmt.Sprintf("%s.%s", "default", "policy1"), + constants.PolicyEventClusterNameLabelKey: "cluster1", + }, + }, + Spec: policyv1.PolicySpec{ + Disabled: false, + PolicyTemplates: []*policyv1.PolicyTemplate{}, + }, + } + Expect(kubeClient.Create(ctx, replicatedPolicy)).ToNot(HaveOccurred()) + + By("Create the replicated policy event") + evt := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default.policy1.17af98f19c06811e", + Namespace: "cluster1", + }, + InvolvedObject: v1.ObjectReference{ + Kind: string(policyv1.Kind), + Namespace: "cluster1", + Name: "default.policy1", + }, + Reason: "PolicyStatusSync", + Message: "Policy default.policy1 status was updated in cluster", + Source: corev1.EventSource{ + Component: "policy-status-sync", + }, + LastTimestamp: metav1.Time{Time: time.Now()}, + } + Expect(kubeClient.Create(ctx, evt)).NotTo(HaveOccurred()) + + receivedEvent := <-consumer.EventChan() + fmt.Println(">>>>>>>>>>>>>>>>>>> replicated policy event", receivedEvent) + Expect(string(enum.LocalReplicatedPolicyEventType)).To(Equal(receivedEvent.Type())) + + replicatedPolicyEvents := []event.ReplicatedPolicyEvent{} + err = json.Unmarshal(receivedEvent.Data(), &replicatedPolicyEvents) + Expect(err).Should(Succeed()) + Expect(replicatedPolicyEvents[0].EventName).To(Equal(evt.Name)) + }) +} diff --git a/agent/pkg/status/controller/event/local_root_policy_emitter.go b/agent/pkg/status/controller/event/local_root_policy_emitter.go index 8e2e56bf5..ec26ecc18 100644 --- a/agent/pkg/status/controller/event/local_root_policy_emitter.go +++ b/agent/pkg/status/controller/event/local_root_policy_emitter.go @@ -3,10 +3,10 @@ package event import ( "context" "fmt" + "strings" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-logr/logr" - lru "github.com/hashicorp/golang-lru" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/config" + "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/filter" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/generic" "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" "github.com/stolostron/multicluster-global-hub/pkg/bundle/version" @@ -27,27 +28,28 @@ var _ generic.ObjectEmitter = &localRootPolicyEmitter{} type localRootPolicyEmitter struct { ctx context.Context + name string log logr.Logger runtimeClient client.Client eventType string topic string currentVersion *version.Version lastSentVersion version.Version - cache *lru.Cache payload event.RootPolicyEventBundle } func NewLocalRootPolicyEmitter(ctx context.Context, c client.Client, topic string) *localRootPolicyEmitter { - cache, _ := lru.New(20) + name := strings.Replace(string(enum.LocalRootPolicyEventType), enum.EventTypePrefix, "", -1) + filter.RegisterTimeFilter(name) return &localRootPolicyEmitter{ ctx: ctx, - log: ctrl.Log.WithName("policy-event-sycner/local-root-policy"), + name: name, + log: ctrl.Log.WithName(name), eventType: string(enum.LocalRootPolicyEventType), topic: transport.GenericEventTopic, runtimeClient: c, currentVersion: version.NewVersion(), lastSentVersion: *version.NewVersion(), - cache: cache, payload: make([]event.RootPolicyEvent, 0), } } @@ -61,13 +63,13 @@ func (h *localRootPolicyEmitter) ShouldUpdate(obj client.Object) bool { return false } - policy, ok := policyEventPredicate(h.ctx, obj, h.runtimeClient, h.log) + policy, ok := policyEventPredicate(h.ctx, h.name, obj, h.runtimeClient, h.log) return ok && !utils.HasAnnotation(policy, constants.OriginOwnerReferenceAnnotation) && !utils.HasLabel(policy, constants.PolicyEventRootPolicyNameLabelKey) } -func policyEventPredicate(ctx context.Context, obj client.Object, c client.Client, log logr.Logger) ( +func policyEventPredicate(ctx context.Context, name string, obj client.Object, c client.Client, log logr.Logger) ( *policiesv1.Policy, bool, ) { evt, ok := obj.(*corev1.Event) @@ -75,6 +77,11 @@ func policyEventPredicate(ctx context.Context, obj client.Object, c client.Clien return nil, false } + // if it's a older event, then return false + if !filter.Newer(name, evt.LastTimestamp.Time) { + return nil, false + } + if evt.InvolvedObject.Kind != policiesv1.Kind { return nil, false } @@ -93,11 +100,6 @@ func (h *localRootPolicyEmitter) Update(obj client.Object) bool { if !ok { return false } - // if exist, then return - evtKey := getEventKey(evt) - if ok = h.cache.Contains(evtKey); ok { - return false - } // get policy policy, err := getInvolvePolicy(h.ctx, h.runtimeClient, evt) @@ -121,9 +123,7 @@ func (h *localRootPolicyEmitter) Update(obj client.Object) bool { PolicyID: string(policy.GetUID()), Compliance: policyCompliance(policy, evt), } - // cache to events and update version h.payload = append(h.payload, rootPolicyEvent) - h.cache.Add(evtKey, nil) return true } @@ -154,6 +154,10 @@ func (h *localRootPolicyEmitter) Topic() string { } func (h *localRootPolicyEmitter) PostSend() { + // update the time filter: with latest event + for _, evt := range h.payload { + filter.CacheTime(h.name, evt.CreatedAt.Time) + } // update version and clean the cache h.payload = make([]event.RootPolicyEvent, 0) // 1. the version get into the next generation diff --git a/agent/pkg/status/controller/event/local_root_policy_emitter_test.go b/agent/pkg/status/controller/event/local_root_policy_emitter_test.go new file mode 100644 index 000000000..6940cf0a2 --- /dev/null +++ b/agent/pkg/status/controller/event/local_root_policy_emitter_test.go @@ -0,0 +1,123 @@ +package event + +import ( + "encoding/json" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1" + + "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" + "github.com/stolostron/multicluster-global-hub/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/enum" +) + +func localRootPolicyEventTestSpecs() { + eventTime := time.Now() + It("should pass the root policy event", func() { + By("Creating a root policy") + rootPolicy := &policyv1.Policy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "policy1", + Namespace: "default", + Finalizers: []string{constants.GlobalHubCleanupFinalizer}, + }, + Spec: policyv1.PolicySpec{ + Disabled: true, + PolicyTemplates: []*policyv1.PolicyTemplate{}, + }, + Status: policyv1.PolicyStatus{ + ComplianceState: policyv1.Compliant, + }, + } + Expect(kubeClient.Create(ctx, rootPolicy)).NotTo(HaveOccurred()) + + evt := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: "policy1.123r543243242", + Namespace: "default", + }, + InvolvedObject: v1.ObjectReference{ + Kind: string(policyv1.Kind), + Namespace: "default", + Name: "policy1", + }, + Reason: "PolicyPropagation", + Message: "Policy default/policy1 was propagated to cluster1", + Source: corev1.EventSource{ + Component: "policy-propagator", + }, + LastTimestamp: metav1.Time{Time: eventTime}, + } + Expect(kubeClient.Create(ctx, evt)).NotTo(HaveOccurred()) + + receivedEvent := <-consumer.EventChan() + fmt.Println(">>>>>>>>>>>>>>>>>>> create event1", receivedEvent) + Expect(string(enum.LocalRootPolicyEventType)).To(Equal(receivedEvent.Type())) + + rootPolicyEvents := []event.RootPolicyEvent{} + err := json.Unmarshal(receivedEvent.Data(), &rootPolicyEvents) + Expect(err).Should(Succeed()) + Expect(rootPolicyEvents[0].EventName).To(Equal(evt.Name)) + }) + + It("should skip the older root policy event", func() { + By("Create a expired event") + expiredEventName := "policy1.expired.123r543243333" + evt := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: expiredEventName, + Namespace: "default", + }, + InvolvedObject: v1.ObjectReference{ + Kind: string(policyv1.Kind), + Namespace: "default", + Name: "policy1", + }, + Reason: "PolicyPropagation", + Message: "Policy default/policy1 was propagated to cluster2", + Source: corev1.EventSource{ + Component: "policy-propagator", + }, + LastTimestamp: metav1.Time{Time: eventTime.Add(-5 * time.Second)}, + } + Expect(kubeClient.Create(ctx, evt)).NotTo(HaveOccurred()) + + By("Create a new event") + newerEventName := "policy1.newer.123r543245555" + evt = &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: newerEventName, + Namespace: "default", + }, + InvolvedObject: v1.ObjectReference{ + Kind: string(policyv1.Kind), + Namespace: "default", + Name: "policy1", + }, + Reason: "PolicyPropagation", + Message: "Policy default/policy1 was propagated to cluster3", + Source: corev1.EventSource{ + Component: "policy-propagator", + }, + LastTimestamp: metav1.Time{Time: eventTime.Add(10 * time.Second)}, + } + Expect(kubeClient.Create(ctx, evt)).NotTo(HaveOccurred()) + + receivedEvent := <-consumer.EventChan() + fmt.Println(">>>>>>>>>>>>>>>>>>> create event2", receivedEvent) + Expect(string(enum.LocalRootPolicyEventType)).To(Equal(receivedEvent.Type())) + + rootPolicyEvents := []event.RootPolicyEvent{} + err := json.Unmarshal(receivedEvent.Data(), &rootPolicyEvents) + Expect(err).Should(Succeed()) + Expect(len(rootPolicyEvents)).To(Equal(1)) + Expect(rootPolicyEvents[0].EventName).NotTo(Equal(expiredEventName)) + Expect(rootPolicyEvents[0].EventName).To(Equal(newerEventName)) + }) +} diff --git a/agent/pkg/status/controller/filter/time_filter.go b/agent/pkg/status/controller/filter/time_filter.go new file mode 100644 index 000000000..97a4473e8 --- /dev/null +++ b/agent/pkg/status/controller/filter/time_filter.go @@ -0,0 +1,144 @@ +package filter + +import ( + "context" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + CACHE_CONFIG_NAME = "multicluster-global-hub-agent-sync-state" + CACHE_TIME_FORMAT = "2006-01-02 15:04:05.000000 -0700 MST m=+0.000000000" +) + +var ( + eventTimeCache = make(map[string]time.Time) + lastEventTimeCache = make(map[string]time.Time) + eventTimeCacheInterval = 5 * time.Second +) + +// CacheTime cache the latest time +func CacheTime(key string, new time.Time) { + old, ok := eventTimeCache[key] + if !ok || old.Before(new) { + eventTimeCache[key] = new + } +} + +// Newer compares the val time with cached the time, if not exist, then return true +func Newer(key string, val time.Time) bool { + old, ok := eventTimeCache[key] + if !ok { + return true + } + return val.After(old) +} + +// LaunchTimeFilter start a goroutine periodically sync the time filter cache to configMap +// and also init the event time cache with configmap +func LaunchTimeFilter(ctx context.Context, c client.Client, namespace string) error { + agentStateConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: CACHE_CONFIG_NAME, + Namespace: namespace, + }, + } + err := c.Get(ctx, client.ObjectKeyFromObject(agentStateConfigMap), agentStateConfigMap) + if err != nil && errors.IsNotFound(err) { + if e := c.Create(ctx, agentStateConfigMap); e != nil { + return e + } + } else if err != nil { + return err + } + + for key := range lastEventTimeCache { + err = loadEventTimeCacheFromConfigMap(agentStateConfigMap, key) + if err != nil { + return err + } + } + + go func() { + ticker := time.NewTicker(eventTimeCacheInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + klog.Info("cancel context") + return + case <-ticker.C: + err := periodicSync(ctx, c, namespace) + if err != nil { + klog.Errorf("failed to sync the configmap %v", err) + } + } + } + }() + + return nil +} + +func periodicSync(ctx context.Context, c client.Client, namespace string) error { + // update the lastSentCache + update := false + for key, currentTime := range eventTimeCache { + lastTime, found := lastEventTimeCache[key] + if !found { + update = true + lastEventTimeCache[key] = currentTime + } + + if lastTime.Before(currentTime) { + update = true + lastEventTimeCache[key] = currentTime + } + } + + // sync the lastSentCache to ConfigMap + if update { + cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: CACHE_CONFIG_NAME, Namespace: namespace}} + err := c.Get(ctx, client.ObjectKeyFromObject(cm), cm) + if err != nil { + return err + } + if cm.Data == nil { + cm.Data = map[string]string{} + } + for key, val := range lastEventTimeCache { + cm.Data[key] = val.Format(CACHE_TIME_FORMAT) + } + err = c.Update(ctx, cm, &client.UpdateOptions{}) + if err != nil { + return err + } + } + return nil +} + +// RegisterTimeFilter call before the LaunchTimeFilter, it will get the init time from the configMap +func RegisterTimeFilter(key string) { + eventTimeCache[key] = time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) + lastEventTimeCache[key] = time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) +} + +func loadEventTimeCacheFromConfigMap(cm *corev1.ConfigMap, key string) error { + val, found := cm.Data[key] + if !found { + klog.Info("the time cache isn't found in the ConfigMap", "key", key, "configMap", cm.Name) + return nil + } + + timeVal, err := time.Parse(CACHE_TIME_FORMAT, val) + if err != nil { + return err + } + eventTimeCache[key] = timeVal + return nil +} diff --git a/agent/pkg/status/controller/filter/time_filter_test.go b/agent/pkg/status/controller/filter/time_filter_test.go new file mode 100644 index 000000000..f817cb62a --- /dev/null +++ b/agent/pkg/status/controller/filter/time_filter_test.go @@ -0,0 +1,136 @@ +// Copyright (c) 2024 Red Hat, Inc. +// Copyright Contributors to the Open Cluster Management project + +package filter + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + + "github.com/stolostron/multicluster-global-hub/pkg/utils" +) + +var ( + cfg *rest.Config + kubeClient kubernetes.Interface + runtimeClient client.Client +) + +func TestMain(m *testing.M) { + err := os.Setenv("POD_NAMESPACE", "default") + if err != nil { + panic(err) + } + err = os.Setenv("MANAGER_TESTING", "true") + if err != nil { + panic(err) + } + + // start testenv + testenv := &envtest.Environment{} + + cfg, err = testenv.Start() + if err != nil { + panic(err) + } + + if cfg == nil { + panic(fmt.Errorf("empty kubeconfig!")) + } + + kubeClient, err = kubernetes.NewForConfig(cfg) + if err != nil { + panic(err) + } + + runtimeClient, err = client.New(cfg, client.Options{}) + if err != nil { + panic(err) + } + + // run testings + code := m.Run() + + // stop testenv + if err := testenv.Stop(); err != nil { + panic(err) + } + os.Exit(code) +} + +func TestTimeFilter(t *testing.T) { + // init the time cache to configMap + initCtx, initCancel := context.WithCancel(context.Background()) + + eventTimeCacheInterval = 1 * time.Second + err := LaunchTimeFilter(initCtx, runtimeClient, "default") + assert.Nil(t, err) + + // the configMap can be created + cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: CACHE_CONFIG_NAME, Namespace: "default"}} + err = runtimeClient.Get(initCtx, client.ObjectKeyFromObject(cm), cm) + assert.Nil(t, err) + utils.PrettyPrint(cm) + + // update the cache time + testTime := time.Now() + fmt.Println("test time", testTime) + CacheTime("test", testTime) + + time.Sleep(2 * time.Second) + + // check the cache time is synced to the configMap + err = runtimeClient.Get(initCtx, client.ObjectKeyFromObject(cm), cm) + assert.Nil(t, err) + cachedTimeStr := cm.Data["test"] + cachedTime, err := time.Parse(CACHE_TIME_FORMAT, cachedTimeStr) + assert.Nil(t, err) + fmt.Println("cached time", cachedTime) + assert.True(t, cachedTime.Equal(testTime)) + initCancel() + + // reload the time from configMap + RegisterTimeFilter("test") + reloadCtx, reloadCancel := context.WithCancel(context.Background()) + defer reloadCancel() + + // update the configmap + updateTime := testTime.Add(5 * time.Second) + cm.Data["test"] = updateTime.Format(CACHE_TIME_FORMAT) + err = runtimeClient.Update(reloadCtx, cm) + assert.Nil(t, err) + + err = LaunchTimeFilter(reloadCtx, runtimeClient, "default") + assert.Nil(t, err) + + err = runtimeClient.Get(reloadCtx, client.ObjectKeyFromObject(cm), cm) + assert.Nil(t, err) + cachedTimeStr = cm.Data["test"] + cachedTime, err = time.Parse(CACHE_TIME_FORMAT, cachedTimeStr) + assert.Nil(t, err) + fmt.Println("updated time1", cachedTime) + assert.True(t, cachedTime.Equal(updateTime)) + + // update the cache with a expired time, verify the cached time isn't changed + CacheTime("test", updateTime.Add(-10*time.Second)) + time.Sleep(2 * time.Second) + + err = runtimeClient.Get(reloadCtx, client.ObjectKeyFromObject(cm), cm) + assert.Nil(t, err) + cachedTimeStr = cm.Data["test"] + cachedTime, err = time.Parse(CACHE_TIME_FORMAT, cachedTimeStr) + assert.Nil(t, err) + fmt.Println("updated time2", cachedTime) + assert.True(t, cachedTime.Equal(updateTime)) +} diff --git a/agent/pkg/status/controller/generic/generic_event_syncer.go b/agent/pkg/status/controller/generic/generic_event_syncer.go index e8d62c88c..c3ad2e875 100644 --- a/agent/pkg/status/controller/generic/generic_event_syncer.go +++ b/agent/pkg/status/controller/generic/generic_event_syncer.go @@ -64,6 +64,7 @@ func (s *genericEventSyncer) periodicSync() { currentSyncInterval := s.syncIntervalFunc() s.log.Info(fmt.Sprintf("sync interval has been reset to %s", currentSyncInterval.String())) ticker := time.NewTicker(currentSyncInterval) + defer ticker.Stop() for { <-ticker.C // wait for next time interval diff --git a/agent/pkg/status/controller/generic/generic_object_syncer.go b/agent/pkg/status/controller/generic/generic_object_syncer.go index cb7f6b09a..e53b13fbb 100644 --- a/agent/pkg/status/controller/generic/generic_object_syncer.go +++ b/agent/pkg/status/controller/generic/generic_object_syncer.go @@ -124,6 +124,7 @@ func (c *genericObjectSyncer) deleteObject(object client.Object) { func (c *genericObjectSyncer) periodicSync() { currentSyncInterval := c.syncIntervalFunc() ticker := time.NewTicker(currentSyncInterval) + defer ticker.Stop() for { <-ticker.C // wait for next time interval diff --git a/agent/pkg/status/controller/policies/status_event_emitter.go b/agent/pkg/status/controller/policies/status_event_emitter.go index e746aefdf..a7d05086e 100644 --- a/agent/pkg/status/controller/policies/status_event_emitter.go +++ b/agent/pkg/status/controller/policies/status_event_emitter.go @@ -8,13 +8,13 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-logr/logr" - lru "github.com/hashicorp/golang-lru" corev1 "k8s.io/api/core/v1" policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/config" + "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/filter" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/generic" "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" eventversion "github.com/stolostron/multicluster-global-hub/pkg/bundle/version" @@ -30,13 +30,13 @@ var ( type statusEventEmitter struct { ctx context.Context + name string log logr.Logger eventType string runtimeClient client.Client currentVersion *eventversion.Version lastSentVersion eventversion.Version payload event.ReplicatedPolicyEventBundle - cache *lru.Cache topic string predicate func(client.Object) bool } @@ -48,16 +48,17 @@ func StatusEventEmitter( c client.Client, topic string, ) generic.ObjectEmitter { - cache, _ := lru.New(30) + name := strings.Replace(string(eventType), enum.EventTypePrefix, "", -1) + filter.RegisterTimeFilter(name) return &statusEventEmitter{ ctx: ctx, - log: ctrl.Log.WithName("policy-syncer/status-event"), + name: name, + log: ctrl.Log.WithName(name), eventType: string(eventType), topic: topic, runtimeClient: c, currentVersion: eventversion.NewVersion(), lastSentVersion: *eventversion.NewVersion(), - cache: cache, payload: make([]event.ReplicatedPolicyEvent, 0), predicate: predicate, } @@ -99,8 +100,8 @@ func (h *statusEventEmitter) Update(obj client.Object) bool { for _, detail := range policy.Status.Details { if detail.History != nil { for _, evt := range detail.History { - key := fmt.Sprintf("%s.%s", evt.EventName, evt.LastTimestamp) - if h.cache.Contains(key) { + // if the event time is older thant the filter cached sent event time, then skip it + if !filter.Newer(h.name, evt.LastTimestamp.Time) { continue } @@ -120,7 +121,6 @@ func (h *statusEventEmitter) Update(obj client.Object) bool { ClusterID: clusterID, Compliance: GetComplianceState(MessageCompliaceStateRegex, evt.Message, string(detail.ComplianceState)), }) - h.cache.Add(key, nil) updated = true } } @@ -146,6 +146,10 @@ func (h *statusEventEmitter) ToCloudEvent() (*cloudevents.Event, error) { } func (h *statusEventEmitter) PostSend() { + // update the time filter: with latest event + for _, evt := range h.payload { + filter.CacheTime(h.name, evt.CreatedAt.Time) + } // update version and clean the cache h.payload = make([]event.ReplicatedPolicyEvent, 0) h.currentVersion.Next()