diff --git a/manager/pkg/statussyncer/conflator/conflation_priority.go b/manager/pkg/statussyncer/conflator/conflation_priority.go index 1abbf2f86..da2dc99f5 100644 --- a/manager/pkg/statussyncer/conflator/conflation_priority.go +++ b/manager/pkg/statussyncer/conflator/conflation_priority.go @@ -8,6 +8,7 @@ const ( HubClusterHeartbeatPriority ConflationPriority = iota HubClusterInfoPriority ConflationPriority = iota ManagedClustersPriority ConflationPriority = iota + ManagedClusterEventPriority ConflationPriority = iota LocalPolicySpecPriority ConflationPriority = iota LocalCompliancePriority ConflationPriority = iota LocalCompleteCompliancePriority ConflationPriority = iota diff --git a/manager/pkg/statussyncer/syncers.go b/manager/pkg/statussyncer/syncers.go index 04c37ab79..aab03111d 100644 --- a/manager/pkg/statussyncer/syncers.go +++ b/manager/pkg/statussyncer/syncers.go @@ -47,6 +47,7 @@ func registerHandler(cmr *conflator.ConflationManager, enableGlobalResource bool dbsyncer.NewHubClusterHeartbeatHandler().RegisterHandler(cmr) dbsyncer.NewHubClusterInfoHandler().RegisterHandler(cmr) dbsyncer.NewManagedClusterHandler().RegisterHandler(cmr) + dbsyncer.NewManagedClusterEventHandler().RegisterHandler(cmr) dbsyncer.NewLocalPolicySpecHandler().RegisterHandler(cmr) dbsyncer.NewLocalPolicyComplianceHandler().RegisterHandler(cmr) dbsyncer.NewLocalPolicyCompleteHandler().RegisterHandler(cmr) diff --git a/manager/pkg/statussyncer/syncers/managedcluster_event_handler.go b/manager/pkg/statussyncer/syncers/managedcluster_event_handler.go new file mode 100644 index 000000000..3b33e88c5 --- /dev/null +++ b/manager/pkg/statussyncer/syncers/managedcluster_event_handler.go @@ -0,0 +1,77 @@ +package dbsyncer + +import ( + "context" + "fmt" + "strings" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/go-logr/logr" + "gorm.io/gorm/clause" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/conflator" + "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" + eventversion "github.com/stolostron/multicluster-global-hub/pkg/bundle/version" + "github.com/stolostron/multicluster-global-hub/pkg/database" + "github.com/stolostron/multicluster-global-hub/pkg/enum" +) + +type managedClusterEventHandler struct { + log logr.Logger + eventType string + eventSyncMode enum.EventSyncMode + eventPriority conflator.ConflationPriority +} + +func NewManagedClusterEventHandler() conflator.Handler { + eventType := string(enum.ManagedClusterEventType) + logName := strings.Replace(eventType, enum.EventTypePrefix, "", -1) + return &managedClusterEventHandler{ + log: ctrl.Log.WithName(logName), + eventType: eventType, + eventSyncMode: enum.DeltaStateMode, + eventPriority: conflator.ManagedClusterEventPriority, + } +} + +func (h *managedClusterEventHandler) RegisterHandler(conflationManager *conflator.ConflationManager) { + conflationManager.Register(conflator.NewConflationRegistration( + h.eventPriority, + h.eventSyncMode, + h.eventType, + h.handleEvent, + )) +} + +func (h *managedClusterEventHandler) handleEvent(ctx context.Context, evt *cloudevents.Event) error { + version := evt.Extensions()[eventversion.ExtVersion] + leafHubName := evt.Source() + h.log.V(2).Info(startMessage, "type", evt.Type(), "LH", evt.Source(), "version", version) + + managedClusterEvents := event.ManagedClusterEventBundle{} + if err := evt.DataAs(&managedClusterEvents); err != nil { + return err + } + + for _, managedClusterEvent := range managedClusterEvents { + managedClusterEvent.LeafHubName = leafHubName + } + + if len(managedClusterEvents) <= 0 { + h.log.Info("empty managed cluster event payload", "event", evt) + return nil + } + + db := database.GetGorm() + err := db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "leaf_hub_name"}, {Name: "event_name"}, {Name: "created_at"}}, + DoNothing: true, + }).CreateInBatches(managedClusterEvents, 100).Error + if err != nil { + return fmt.Errorf("failed handling leaf hub LocalPolicyStatusEvent event - %w", err) + } + + h.log.V(2).Info(finishMessage, "type", evt.Type(), "LH", evt.Source(), "version", version) + return nil +} diff --git a/manager/pkg/statussyncer/syncers/managedcluster_event_handler_test.go b/manager/pkg/statussyncer/syncers/managedcluster_event_handler_test.go new file mode 100644 index 000000000..0ff3fe476 --- /dev/null +++ b/manager/pkg/statussyncer/syncers/managedcluster_event_handler_test.go @@ -0,0 +1,65 @@ +package dbsyncer_test + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" + eventversion "github.com/stolostron/multicluster-global-hub/pkg/bundle/version" + "github.com/stolostron/multicluster-global-hub/pkg/database" + "github.com/stolostron/multicluster-global-hub/pkg/database/models" + "github.com/stolostron/multicluster-global-hub/pkg/enum" +) + +// go test ./manager/pkg/statussyncer/syncers -v -ginkgo.focus "ManagedClusterEventHandler" +var _ = Describe("ManagedClusterEventHandler", Ordered, func() { + It("should be able to sync replicate policy event", func() { + By("Create hubClusterInfo event") + + leafHubName := "hub1" + version := eventversion.NewVersion() + version.Incr() + + data := event.ManagedClusterEventBundle{} + data = append(data, models.ManagedClusterEvent{ + EventNamespace: "managed-cluster1", + EventName: "managed-cluster1.17cd5c3642c43a8a", + ClusterID: "13b2e003-2bdf-4c82-9bdf-f1aa7ccf608d", + LeafHubName: "hub1", + Message: "The managed cluster (managed-cluster1) cannot connect to the hub cluster.", + Reason: "AvailableUnknown", + ReportingController: "registration-controller", + ReportingInstance: "registration-controller-cluster-manager-registration-controller-6794cf54d9-j7lgm", + EventType: "Warning", + CreatedAt: time.Now(), + }) + + evt := ToCloudEvent(leafHubName, string(enum.ManagedClusterEventType), version, data) + + By("Sync event with transport") + err := producer.SendEvent(ctx, *evt) + Expect(err).Should(Succeed()) + + By("Check the leaf hubs table") + Eventually(func() error { + db := database.GetGorm() + items := []models.ManagedClusterEvent{} + if err := db.Find(&items).Error; err != nil { + return err + } + + count := 0 + for _, item := range items { + fmt.Println(item.LeafHubName, item.EventName, item.Message, item.CreatedAt) + count++ + } + if count > 0 { + return nil + } + return fmt.Errorf("not found expected resource on the table") + }, 30*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) + }) +}) diff --git a/manager/pkg/statussyncer/syncers/managed_clusters_handler.go b/manager/pkg/statussyncer/syncers/managedcluster_handler.go similarity index 100% rename from manager/pkg/statussyncer/syncers/managed_clusters_handler.go rename to manager/pkg/statussyncer/syncers/managedcluster_handler.go diff --git a/manager/pkg/statussyncer/syncers/managed_clusters_handler_test.go b/manager/pkg/statussyncer/syncers/managedcluster_handler_test.go similarity index 100% rename from manager/pkg/statussyncer/syncers/managed_clusters_handler_test.go rename to manager/pkg/statussyncer/syncers/managedcluster_handler_test.go diff --git a/operator/pkg/controllers/hubofhubs/database/2.tables.sql b/operator/pkg/controllers/hubofhubs/database/2.tables.sql index a276e38a4..35af3b02b 100644 --- a/operator/pkg/controllers/hubofhubs/database/2.tables.sql +++ b/operator/pkg/controllers/hubofhubs/database/2.tables.sql @@ -59,6 +59,20 @@ CREATE TABLE IF NOT EXISTS status.leaf_hubs ( CREATE INDEX IF NOT EXISTS leafhub_deleted_at_idx ON status.leaf_hubs (deleted_at); -- Partition tables +CREATE TABLE IF NOT EXISTS event.managed_clusters ( + event_namespace text NOT NULL, + event_name text NOT NULL, + cluster_id uuid NOT NULL, + leaf_hub_name character varying(256) NOT NULL, + message text, + reason text, + reporting_controller text, + reporting_instance text, + event_type character varying(64) NOT NULL, + created_at timestamp without time zone DEFAULT now() NOT NULL, + CONSTRAINT managed_clusters_unique_constraint UNIQUE (leaf_hub_name, event_name, created_at) +) PARTITION BY RANGE (created_at); + CREATE TABLE IF NOT EXISTS event.local_policies ( event_name text NOT NULL, policy_id uuid NOT NULL, diff --git a/operator/pkg/controllers/hubofhubs/database/4.trigger.sql b/operator/pkg/controllers/hubofhubs/database/4.trigger.sql index d16f2c955..7017a90c7 100644 --- a/operator/pkg/controllers/hubofhubs/database/4.trigger.sql +++ b/operator/pkg/controllers/hubofhubs/database/4.trigger.sql @@ -16,8 +16,11 @@ EXECUTE FUNCTION public.update_local_compliance_cluster_id(); SELECT create_monthly_range_partitioned_table('event.local_root_policies', to_char(current_date, 'YYYY-MM-DD')); SELECT create_monthly_range_partitioned_table('event.local_policies', to_char(current_date, 'YYYY-MM-DD')); SELECT create_monthly_range_partitioned_table('history.local_compliance', to_char(current_date, 'YYYY-MM-DD')); +SELECT create_monthly_range_partitioned_table('event.managed_clusters', to_char(current_date, 'YYYY-MM-DD')); --- create the previous month partitioned tables for receiving the data from the previous month SELECT create_monthly_range_partitioned_table('event.local_root_policies', to_char(current_date - interval '1 month', 'YYYY-MM-DD')); SELECT create_monthly_range_partitioned_table('event.local_policies', to_char(current_date - interval '1 month', 'YYYY-MM-DD')); SELECT create_monthly_range_partitioned_table('history.local_compliance', to_char(current_date - interval '1 month', 'YYYY-MM-DD')); +SELECT create_monthly_range_partitioned_table('event.managed_clusters', to_char(current_date - interval '1 month', 'YYYY-MM-DD')); + diff --git a/pkg/bundle/event/managedcluster_event_bundle.go b/pkg/bundle/event/managedcluster_event_bundle.go new file mode 100644 index 000000000..48fe37108 --- /dev/null +++ b/pkg/bundle/event/managedcluster_event_bundle.go @@ -0,0 +1,5 @@ +package event + +import "github.com/stolostron/multicluster-global-hub/pkg/database/models" + +type ManagedClusterEventBundle []models.ManagedClusterEvent diff --git a/pkg/database/models/event.go b/pkg/database/models/event.go index a8ced4a21..d9f12d4e8 100644 --- a/pkg/database/models/event.go +++ b/pkg/database/models/event.go @@ -48,3 +48,20 @@ type DataRetentionJobLog struct { func (DataRetentionJobLog) TableName() string { return "event.data_retention_job_log" } + +type ManagedClusterEvent struct { + EventNamespace string `gorm:"column:event_namespace;type:varchar(63);not null" json:"eventNamespace"` + EventName string `gorm:"column:event_name;type:varchar(63);not null" json:"eventName"` + ClusterID string `gorm:"column:cluster_id;type:uuid;not null" json:"clusterId"` + LeafHubName string `gorm:"size:256;not null" json:"-"` + Message string `gorm:"column:message;type:text" json:"message"` + Reason string `gorm:"column:reason;type:text" json:"reason"` + ReportingController string `gorm:"column:reporting_controller;type:text" json:"reportingController"` + ReportingInstance string `gorm:"column:reporting_instance;type:text" json:"reportingInstance"` + EventType string `gorm:"size:256;not null" json:"type"` + CreatedAt time.Time `gorm:"column:created_at;default:now();not null" json:"createdAt"` +} + +func (ManagedClusterEvent) TableName() string { + return "event.managed_clusters" +} diff --git a/pkg/enum/event_type.go b/pkg/enum/event_type.go index c0ecaa2a5..e3e44599d 100644 --- a/pkg/enum/event_type.go +++ b/pkg/enum/event_type.go @@ -33,4 +33,6 @@ const ( LocalPlacementRuleSpecType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.placementrule.localspec" PlacementRuleSpecType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.placementrule.spec" PlacementSpecType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.placement.spec" + + ManagedClusterEventType EventType = "io.open-cluster-management.operator.multiclusterglobalhubs.event.managedcluster" )