Skip to content

Commit

Permalink
Update event schema to comply with the propposed structure (project-k…
Browse files Browse the repository at this point in the history
…essel#120)

* Update event schema to comply with the propposed structure

* - Use structured instead of binary format when sending kafka events
  In binary, the cloudevent envelope goes as headers
  In structured, the cloudevent is a full json that goes in the payload

- Include the resource id in the event message

* Update local config to include public url

* Sets a default public url

* Update content type and subject format
  • Loading branch information
josejulio authored Sep 17, 2024
1 parent 508fc28 commit 405b039
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 47 deletions.
1 change: 1 addition & 0 deletions .inventory-api.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
server:
public_url: http://localhost:8081
http:
address: localhost:8081
grpc:
Expand Down
2 changes: 1 addition & 1 deletion cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewCommand(
// construct eventing
// Note that we pass the server id here to act as the Source URI in cloudevents
// If a server ID isn't configured explicitly, `os.Hostname()` is used.
eventingManager, err := eventing.New(eventingConfig, serverConfig.Options.Id, log.NewHelper(log.With(logger, "subsystem", "eventing")))
eventingManager, err := eventing.New(eventingConfig, serverConfig.Options.PublicUrl, log.NewHelper(log.With(logger, "subsystem", "eventing")))
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/data/hosts/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *hostsRepo) Save(ctx context.Context, model *biz.Host) (*biz.Host, error
if r.Eventer != nil {
// TODO: Update the Object that's sent. This is going to be what we actually emit.
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
evt := eventingapi.NewAddEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
evt := eventingapi.NewCreatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
})
Expand Down Expand Up @@ -72,7 +72,7 @@ func (r *hostsRepo) Update(ctx context.Context, model *biz.Host, id string) (*bi

if r.Eventer != nil {
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
evt := eventingapi.NewUpdateEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
evt := eventingapi.NewUpdatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
})
Expand Down Expand Up @@ -106,7 +106,7 @@ func (r *hostsRepo) Delete(ctx context.Context, id string) error {
// TODO: without persistence, we can't lookup the inventory assigned id or other model specific info.
var dummyId int64 = 0
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, dummyId)
evt := eventingapi.NewDeleteEvent(biz.ResourceType, id, time.Now().UTC(), identity)
evt := eventingapi.NewDeletedResourceEvent(biz.ResourceType, id, time.Now().UTC(), identity)
err = producer.Produce(ctx, evt)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions internal/data/k8sclusters/k8sclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r *k8sclustersRepo) Save(ctx context.Context, model *biz.K8SCluster) (*biz

if r.Eventer != nil {
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
evt := eventingapi.NewAddEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8SClusterDetail]{
evt := eventingapi.NewCreatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8SClusterDetail]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
ResourceData: model.ResourceData,
Expand Down Expand Up @@ -69,7 +69,7 @@ func (r *k8sclustersRepo) Update(ctx context.Context, model *biz.K8SCluster, id

if r.Eventer != nil {
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
evt := eventingapi.NewUpdateEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8SClusterDetail]{
evt := eventingapi.NewUpdatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8SClusterDetail]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
ResourceData: model.ResourceData,
Expand Down Expand Up @@ -104,7 +104,7 @@ func (r *k8sclustersRepo) Delete(ctx context.Context, id string) error {
// TODO: without persistence, we can't lookup the inventory assigned id or info like the external cluster id.
var dummyId int64 = 0
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, dummyId)
evt := eventingapi.NewDeleteEvent(biz.ResourceType, id, time.Now().UTC(), identity)
evt := eventingapi.NewDeletedResourceEvent(biz.ResourceType, id, time.Now().UTC(), identity)
err = producer.Produce(ctx, evt)
if err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions internal/data/k8spolicies/k8spolicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *k8spoliciesRepo) Save(ctx context.Context, model *biz.K8sPolicy) (*biz.
if r.Eventer != nil {
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
// TODO: Update the Object that's sent. This is going to be what we actually emit.
evt := eventingapi.NewAddEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8sPolicyDetail]{
evt := eventingapi.NewCreatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8sPolicyDetail]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
ResourceData: model.ResourceData,
Expand Down Expand Up @@ -72,7 +72,10 @@ func (r *k8spoliciesRepo) Update(ctx context.Context, model *biz.K8sPolicy, id s

if r.Eventer != nil {
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
evt := eventingapi.NewUpdateEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8sPolicyDetail]{
// Todo: model.Metadata.Reporters[0].LocalResourceId is the resourceId of the resource in the scope of the reporter.
// When we have multiple reporters we will need to make sure to publish the correct one in the eventing api.
// Is it the last one?
evt := eventingapi.NewUpdatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[biz.K8sPolicyDetail]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
ResourceData: model.ResourceData,
Expand Down Expand Up @@ -107,7 +110,7 @@ func (r *k8spoliciesRepo) Delete(ctx context.Context, id string) error {
// TODO: without persistence, we can't lookup the inventory assigned id or other model specific info.
var dummyId int64 = 0
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, dummyId)
evt := eventingapi.NewDeleteEvent(biz.ResourceType, id, time.Now().UTC(), identity)
evt := eventingapi.NewDeletedResourceEvent(biz.ResourceType, id, time.Now().UTC(), identity)
err = producer.Produce(ctx, evt)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *notificationsintegrationsRepo) Save(ctx context.Context, model *biz.Not
if r.Eventer != nil {
// TODO: Update the Object that's sent. This is going to be what we actually emit.
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
evt := eventingapi.NewAddEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
evt := eventingapi.NewCreatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
})
Expand Down Expand Up @@ -72,7 +72,7 @@ func (r *notificationsintegrationsRepo) Update(ctx context.Context, model *biz.N

if r.Eventer != nil {
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, model.ID)
evt := eventingapi.NewUpdateEvent(biz.ResourceType, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
evt := eventingapi.NewUpdatedResourceEvent(biz.ResourceType, model.Metadata.Reporters[0].LocalResourceId, model.Metadata.UpdatedAt, &eventingapi.EventResource[struct{}]{
Metadata: &model.Metadata,
ReporterData: model.Metadata.Reporters[0],
})
Expand Down Expand Up @@ -106,7 +106,7 @@ func (r *notificationsintegrationsRepo) Delete(ctx context.Context, id string) e
// TODO: without persistence, we can't lookup the inventory assigned id or other model specific info.
var dummyId int64 = 0
producer, _ := r.Eventer.Lookup(identity, biz.ResourceType, dummyId)
evt := eventingapi.NewDeleteEvent(biz.ResourceType, id, time.Now().UTC(), identity)
evt := eventingapi.NewDeletedResourceEvent(biz.ResourceType, id, time.Now().UTC(), identity)
err = producer.Produce(ctx, evt)
if err != nil {
return err
Expand Down
102 changes: 78 additions & 24 deletions internal/eventing/api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,63 @@ import (
biz "github.com/project-kessel/inventory-api/internal/biz/common"
)

const (
ADD string = "add"
UPDATE string = "update"
REMOVE string = "remove"
)

// TODO this is a bit of a hack for now to convert the models into the shape expected by event consumers
type EventResource[Detail any] struct {
Metadata *biz.Metadata `json:"metadata"`
ReporterData *biz.Reporter `json:"reporter_data"`
ResourceData *Detail `json:"resource_data"`
}

type Event struct {
EventType string `json:"event_type"`
const (
EventTypeResource = "resources"
EventTypeResourcesRelationship = "resources_relationship"
)

EventTime time.Time `json:"event_time"`
const (
operationTypeCreated = "created"
operationTypeUpdated = "updated"
operationTypeDeleted = "deleted"
)

// TODO: events may be sent for relationships as well as resource types.
type Event struct {
EventType string `json:"event_type"`
OperationType string `json:"operation_type"`

EventTime time.Time `json:"event_time"`
ResourceType string `json:"resource_type"`
Resource interface{} `json:"resource"`
ResourceId string `json:"resource_id"`
}

func NewAddEvent[Detail any](resourceType string, last_reported_time time.Time, obj *EventResource[Detail]) *Event {
func NewCreatedResourceEvent[Detail any](resourceType, resourceId string, lastReportedTime time.Time, obj *EventResource[Detail]) *Event {
return &Event{
EventType: ADD,
EventTime: last_reported_time,
ResourceType: resourceType,
Resource: obj,
EventType: EventTypeResource,
OperationType: operationTypeCreated,
EventTime: lastReportedTime,
ResourceType: resourceType,
ResourceId: resourceId,
Resource: obj,
}
}

func NewUpdateEvent[Detail any](resourceType string, last_reported_time time.Time, obj *EventResource[Detail]) *Event {
func NewUpdatedResourceEvent[Detail any](resourceType, resourceId string, lastReportedTime time.Time, obj *EventResource[Detail]) *Event {
return &Event{
EventType: UPDATE,
EventTime: last_reported_time,
ResourceType: resourceType,
Resource: obj,
EventType: EventTypeResource,
OperationType: operationTypeUpdated,
EventTime: lastReportedTime,
ResourceType: resourceType,
ResourceId: resourceId,
Resource: obj,
}
}

func NewDeleteEvent(resourceType string, resourceId string, last_reported_time time.Time, requester *authnapi.Identity) *Event {
func NewDeletedResourceEvent(resourceType, resourceId string, lastReportedTime time.Time, requester *authnapi.Identity) *Event {
return &Event{
EventType: REMOVE,
EventTime: last_reported_time,
ResourceType: resourceType,
EventType: EventTypeResource,
OperationType: operationTypeDeleted,
EventTime: lastReportedTime,
ResourceType: resourceType,
ResourceId: resourceId,
Resource: &EventResource[struct{}]{
Metadata: &biz.Metadata{},
ReporterData: &biz.Reporter{
Expand All @@ -64,3 +75,46 @@ func NewDeleteEvent(resourceType string, resourceId string, last_reported_time t
},
}
}

func NewCreatedResourcesRelationshipEvent[Detail any](relationshipType, subjectResourceId string, lastReportedTime time.Time, obj *EventResource[Detail]) *Event {
return &Event{
EventType: EventTypeResourcesRelationship,
OperationType: operationTypeCreated,
EventTime: lastReportedTime,
ResourceType: relationshipType,
ResourceId: subjectResourceId,
Resource: obj,
}
}

func NewUpdatedResourcesRelationshipEvent[Detail any](relationshipType, subjectResourceId string, lastReportedTime time.Time, obj *EventResource[Detail]) *Event {
return &Event{
EventType: EventTypeResourcesRelationship,
OperationType: operationTypeUpdated,
EventTime: lastReportedTime,
ResourceType: relationshipType,
ResourceId: subjectResourceId,
Resource: obj,
}
}

func NewDeletedResourcesRelationshipEvent(relationshipType, subjectResourceId string, lastReportedTime time.Time, requester *authnapi.Identity) *Event {
return &Event{
EventType: EventTypeResourcesRelationship,
OperationType: operationTypeDeleted,
EventTime: lastReportedTime,
ResourceType: relationshipType,
ResourceId: subjectResourceId,
Resource: "stub",
// Todo: We need a separate type - Metadata format is different on Relationships
// Relation: &EventResource[struct{}]{
// //Metadata: &biz.Metadata{},
// //ReporterData: &biz.Reporter{
// // ReporterID: requester.Principal,
// // ReporterType: requester.Type,
// // LocalResourceId: resourceId,
// //},
// //ResourceData: nil,
// },
}
}
29 changes: 23 additions & 6 deletions internal/eventing/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package kafka
import (
"context"
"fmt"

"github.com/go-kratos/kratos/v2/log"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"strings"

confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -144,18 +145,26 @@ func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identit
func (p *kafkaProducer) Produce(ctx context.Context, event *api.Event) error {
e := cloudevents.NewEvent()

e.SetSource(p.Manager.Source)
e.SetSpecVersion(cloudevents.VersionV1)
e.SetType(fmt.Sprintf("%s:%s", event.EventType, event.ResourceType))
eventId, err := uuid.NewUUID() // Todo: we need to have an stable id if we implement some re-trying logic
if err != nil {
return err
}

e.SetSpecVersion(cloudevents.VersionV1)
e.SetType(makeCloudEventType(event.EventType, event.ResourceType, event.OperationType))
e.SetSource(p.Manager.Source)
e.SetID(eventId.String())
e.SetTime(event.EventTime)

err := e.SetData(cloudevents.ApplicationJSON, event.Resource)
err = e.SetData(cloudevents.ApplicationJSON, event.Resource)
if err != nil {
return err
}

ret := p.Manager.Client.Send(confluent.WithMessageKey(cecontext.WithTopic(ctx, p.Topic), p.Manager.Source), e)
e.SetDataContentType("application/json")
e.SetSubject(makeCloudEventSubject(event.EventType, event.ResourceType, event.ResourceId))

ret := p.Manager.Client.Send(confluent.WithMessageKey(cecontext.WithTopic(cloudevents.WithEncodingStructured(ctx), p.Topic), p.Manager.Source), e)
if cloudevents.IsUndelivered(ret) {
p.Logger.Infof("Failed to send %v", ret)
} else {
Expand All @@ -172,3 +181,11 @@ func (p *kafkaProducer) Produce(ctx context.Context, event *api.Event) error {
)
return ret
}

func makeCloudEventType(eventType, resourceType, operation string) string {
return fmt.Sprintf("redhat.inventory.%s.%s.%s", eventType, resourceType, operation)
}

func makeCloudEventSubject(eventType, resourceType, resourceId string) string {
return "/" + strings.Join([]string{eventType, resourceType, resourceId}, "/")
}
11 changes: 7 additions & 4 deletions internal/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
)

type Options struct {
Id string `mapstructure:"id"`
Name string `mapstructure:"name"`
Id string `mapstructure:"id"`
Name string `mapstructure:"name"`
PublicUrl string `mapstructure:"public_url"`

GrpcOptions *grpc.Options `mapstructure:"grpc"`
HttpOptions *http.Options `mapstructure:"http"`
Expand All @@ -20,8 +21,9 @@ type Options struct {
func NewOptions() *Options {
id, _ := os.Hostname()
return &Options{
Id: id,
Name: "kessel-asset-inventory",
Id: id,
Name: "kessel-asset-inventory",
PublicUrl: "http://localhost:8081",

GrpcOptions: grpc.NewOptions(),
HttpOptions: http.NewOptions(),
Expand All @@ -35,6 +37,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, prefix string) {

fs.StringVar(&o.Id, prefix+"id", o.Id, "id of the server")
fs.StringVar(&o.Name, prefix+"name", o.Name, "name of the server")
fs.StringVar(&o.PublicUrl, prefix+"public_url", o.PublicUrl, "Public url where the server is reachable")

o.GrpcOptions.AddFlags(fs, prefix+"grpc")
o.HttpOptions.AddFlags(fs, prefix+"http")
Expand Down

0 comments on commit 405b039

Please sign in to comment.