Skip to content

Commit

Permalink
Implement TraTExclusion resource
Browse files Browse the repository at this point in the history
  • Loading branch information
kchiranjewee63 committed Aug 6, 2024
1 parent ceffa0b commit d8f92f3
Show file tree
Hide file tree
Showing 20 changed files with 886 additions and 21 deletions.
68 changes: 68 additions & 0 deletions installation/resources/crds/tratexcl-crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: tratexclusions.tratteria.io
spec:
group: tratteria.io
names:
kind: TraTExclusion
plural: tratexclusions
singular: tratexclusion
listKind: TraTExclusionList
shortNames:
- tratexcl
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
type: object
required: ["service", "endpoints"]
properties:
service:
type: string
endpoints:
type: array
items:
type: object
required: ["path", "method"]
properties:
path:
type: string
method:
type: string
status:
type: object
properties:
status:
type: string
default: "PENDING"
lastErrorMessage:
type: string
nullable: true
retries:
type: integer
default: 0
additionalPrinterColumns:
- name: "Service"
type: "string"
jsonPath: ".spec.service"
- name: "Status"
type: "string"
jsonPath: ".status.status"
- name: "Age"
type: "date"
jsonPath: ".metadata.creationTimestamp"
subresources:
status: {}
4 changes: 2 additions & 2 deletions installation/resources/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ rules:
resources: ["events"]
verbs: ["create", "patch"]
- apiGroups: ["tratteria.io"]
resources: ["trats", "tratteriaconfigs"]
resources: ["trats", "tratteriaconfigs", "tratexclusions"]
verbs: ["get", "list", "watch"]
- apiGroups: ["tratteria.io"]
resources: ["trats/status", "tratteriaconfigs/status"]
resources: ["trats/status", "tratteriaconfigs/status", "tratexclusions/status"]
verbs: ["update"]
53 changes: 38 additions & 15 deletions service/servicemessagehandler/servicemessagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ type TraTDeletionRequestMessage struct {
TraTName string
}

func (rd *ServiceMessageHandler) SetClientsRetriever(clientsRetriever websocketserver.ClientsRetriever) {
rd.clientsRetriever = clientsRetriever
func (smh *ServiceMessageHandler) SetClientsRetriever(clientsRetriever websocketserver.ClientsRetriever) {
smh.clientsRetriever = clientsRetriever
}

//nolint:unparam
func (rd *ServiceMessageHandler) sendMessage(ctx context.Context, serviceName string, namespace string, messageType websocketserver.MessageType, rule json.RawMessage, versionNumber int64) error {
clients := rd.clientsRetriever.GetClientManagers(serviceName, namespace)
func (smh *ServiceMessageHandler) sendMessage(ctx context.Context, serviceName string, namespace string, messageType websocketserver.MessageType, rule json.RawMessage, versionNumber int64) error {
clients := smh.clientsRetriever.GetClientManagers(serviceName, namespace)

var dispatchErrors []string
/*
Expand Down Expand Up @@ -63,46 +63,46 @@ func (rd *ServiceMessageHandler) sendMessage(ctx context.Context, serviceName st
return nil
}

func (rd *ServiceMessageHandler) DispatchTraTVerificationRule(ctx context.Context, serviceName string, namespace string, serviceTraTVerificationRules *v1alpha1.ServiceTraTVerificationRules, versionNumber int64) error {
func (smh *ServiceMessageHandler) DispatchTraTVerificationRule(ctx context.Context, serviceName string, namespace string, serviceTraTVerificationRules *v1alpha1.ServiceTraTVerificationRules, versionNumber int64) error {
jsonData, err := json.Marshal(serviceTraTVerificationRules)
if err != nil {
return fmt.Errorf("error marshaling verification trat rule: %w", err)
}

err = rd.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTraTVerificationRuleUpsertRequest, jsonData, versionNumber)
err = smh.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTraTVerificationRuleUpsertRequest, jsonData, versionNumber)
if err != nil {
return fmt.Errorf("error dispatching verification trat rule to %s service: %w", serviceName, err)
}

return nil
}

func (rd *ServiceMessageHandler) DeleteTraT(ctx context.Context, serviceName string, namespace string, traTName string, versionNumber int64) error {
func (smh *ServiceMessageHandler) DeleteTraT(ctx context.Context, serviceName string, namespace string, traTName string, versionNumber int64) error {
traTDeletionRequestMessage := TraTDeletionRequestMessage{TraTName: traTName}

jsonData, err := json.Marshal(traTDeletionRequestMessage)
if err != nil {
return fmt.Errorf("error marshaling verification trat rule: %w", err)
}

err = rd.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTraTDeletionRequest, jsonData, versionNumber)
err = smh.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTraTDeletionRequest, jsonData, versionNumber)
if err != nil {
return fmt.Errorf("error deleting trat from %s service: %w", serviceName, err)
}

return nil
}

func (rd *ServiceMessageHandler) DispatchTratteriaConfigVerificationRule(ctx context.Context, namespace string, verificationTokenRule *v1alpha1.TratteriaConfigVerificationRule, versionNumber int64) error {
func (smh *ServiceMessageHandler) DispatchTratteriaConfigVerificationRule(ctx context.Context, namespace string, verificationTokenRule *v1alpha1.TratteriaConfigVerificationRule, versionNumber int64) error {
jsonData, err := json.Marshal(verificationTokenRule)
if err != nil {
return fmt.Errorf("error marshaling verification tratteria config rule: %w", err)
}

var dispatchErrors []string

for _, serviceName := range rd.clientsRetriever.GetTratteriaAgentServices(namespace) {
err = rd.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTratteriaConfigVerificationRuleUpsertRequest, jsonData, versionNumber)
for _, serviceName := range smh.clientsRetriever.GetTratteriaAgentServices(namespace) {
err = smh.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTratteriaConfigVerificationRuleUpsertRequest, jsonData, versionNumber)
if err != nil {
dispatchErrors = append(dispatchErrors, fmt.Sprintf("error dispatching verification token rule to %s service: %v", serviceName, err))
}
Expand All @@ -115,30 +115,53 @@ func (rd *ServiceMessageHandler) DispatchTratteriaConfigVerificationRule(ctx con
return nil
}

func (rd *ServiceMessageHandler) DispatchTraTGenerationRule(ctx context.Context, namespace string, generationEndpointRule *v1alpha1.TraTGenerationRule, verisionNumber int64) error {
func (smh *ServiceMessageHandler) DispatchTraTGenerationRule(ctx context.Context, namespace string, generationEndpointRule *v1alpha1.TraTGenerationRule, verisionNumber int64) error {
jsonData, err := json.Marshal(generationEndpointRule)
if err != nil {
return fmt.Errorf("error marshaling generation trat rule: %w", err)
}

err = rd.sendMessage(ctx, common.TRATTERIA_SERVICE_NAME, namespace, websocketserver.MessageTypeTraTGenerationRuleUpsertRequest, jsonData, verisionNumber)
err = smh.sendMessage(ctx, common.TRATTERIA_SERVICE_NAME, namespace, websocketserver.MessageTypeTraTGenerationRuleUpsertRequest, jsonData, verisionNumber)
if err != nil {
return fmt.Errorf("error dispatching generation trat rule to tratteria: %w", err)
}

return nil
}

func (rd *ServiceMessageHandler) DispatchTratteriaConfigGenerationRule(ctx context.Context, namespace string, generationTokenRule *v1alpha1.TratteriaConfigGenerationRule, versionNumber int64) error {
func (smh *ServiceMessageHandler) DispatchTratteriaConfigGenerationRule(ctx context.Context, namespace string, generationTokenRule *v1alpha1.TratteriaConfigGenerationRule, versionNumber int64) error {
jsonData, err := json.Marshal(generationTokenRule)
if err != nil {
return fmt.Errorf("error marshaling generation tratteria config rule: %w", err)
}

err = rd.sendMessage(ctx, common.TRATTERIA_SERVICE_NAME, namespace, websocketserver.MessageTypeTratteriaConfigGenerationRuleUpsertRequest, jsonData, versionNumber)
err = smh.sendMessage(ctx, common.TRATTERIA_SERVICE_NAME, namespace, websocketserver.MessageTypeTratteriaConfigGenerationRuleUpsertRequest, jsonData, versionNumber)
if err != nil {
return fmt.Errorf("error dispatching generation tratteria config rule to tratteria: %w", err)
}

return nil
}

func (smh *ServiceMessageHandler) DispatchTraTExclRule(ctx context.Context, serviceName string, namespace string, traTExclRule *v1alpha1.TraTExclRule, versionNumber int64) error {
jsonData, err := json.Marshal(traTExclRule)
if err != nil {
return fmt.Errorf("error marshaling tratexcl rule: %w", err)
}

err = smh.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTraTExclRuleUpsertRequest, jsonData, versionNumber)
if err != nil {
return fmt.Errorf("error dispatching tratexcl rule to %s service: %w", serviceName, err)
}

return nil
}

func (smh *ServiceMessageHandler) DeleteTraTExcl(ctx context.Context, serviceName string, namespace string, versionNumber int64) error {
err := smh.sendMessage(ctx, serviceName, namespace, websocketserver.MessageTypeTraTExclRuleDeleteRequest, nil, versionNumber)
if err != nil {
return fmt.Errorf("error deleting tratexcl rule from %s service: %w", serviceName, err)
}

return nil
}
89 changes: 87 additions & 2 deletions service/tratteriacontroller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ type TratteriaConfigOperation struct {
VersionNumber int64
}

type TratteriaExclOperation struct {
Type OperationType
NewTraTExcl *tratteria1alpha1.TraTExclusion
OldTraTExcl *tratteria1alpha1.TraTExclusion
VersionNumber int64
}

type ServiceHash struct {
ruleVersionNumber int64
mu sync.RWMutex
Expand Down Expand Up @@ -105,8 +112,10 @@ type Controller struct {
tratteriaclientset clientset.Interface
traTsLister listers.TraTLister
tratteriaConfigsLister listers.TratteriaConfigLister
tratExclusionsLister listers.TraTExclusionLister
traTsSynced cache.InformerSynced
tratteriaConfigsSynced cache.InformerSynced
tratExclusionsSynced cache.InformerSynced
workqueue workqueue.TypedRateLimitingInterface[any]
recorder record.EventRecorder
serviceMessageHandler *servicemessagehandler.ServiceMessageHandler
Expand All @@ -121,6 +130,7 @@ func NewController(
tratteriaclientset clientset.Interface,
traTInformer informers.TraTInformer,
tratteriaConfigInformer informers.TratteriaConfigInformer,
tratExclusionInformer informers.TraTExclusionInformer,
serviceMessageHandler *servicemessagehandler.ServiceMessageHandler,
logger *zap.Logger) *Controller {

Expand All @@ -140,8 +150,10 @@ func NewController(
tratteriaclientset: tratteriaclientset,
traTsLister: traTInformer.Lister(),
tratteriaConfigsLister: tratteriaConfigInformer.Lister(),
tratExclusionsLister: tratExclusionInformer.Lister(),
traTsSynced: traTInformer.Informer().HasSynced,
tratteriaConfigsSynced: tratteriaConfigInformer.Informer().HasSynced,
tratExclusionsSynced: tratExclusionInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]()),
recorder: recorder,
serviceMessageHandler: serviceMessageHandler,
Expand All @@ -163,6 +175,12 @@ func NewController(
UpdateFunc: controller.UpdateFunc,
})

tratExclusionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.AddFunc,
UpdateFunc: controller.UpdateFunc,
DeleteFunc: controller.DeleteFunc,
})

return controller
}

Expand All @@ -186,6 +204,15 @@ func (c *Controller) AddFunc(obj interface{}) {
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
zap.Int64("version-number", versionNumber))
case *tratteria1alpha1.TraTExclusion:
versionNumber := atomic.AddInt64(&c.ruleVersionNumber, 1)

c.workqueue.Add(TratteriaExclOperation{Type: ADD, NewTraTExcl: v, VersionNumber: versionNumber})

c.logger.Info("Processing TraTExcl addition operation.",
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
zap.Int64("version-number", versionNumber))
default:
c.logger.Error("Unknown type incountered for addition operation", zap.Any("obj", obj))
}
Expand Down Expand Up @@ -242,6 +269,31 @@ func (c *Controller) UpdateFunc(oldObj, newObj interface{}) {
zap.String("name", oldV.Name),
zap.String("namespace", oldV.Namespace),
zap.Int64("version-number", versionNumber))
case *tratteria1alpha1.TraTExclusion:
newV, ok := newObj.(*tratteria1alpha1.TraTExclusion)
if !ok {
c.logger.Error("Received unexpected object type", zap.String("expected", "TraTExcl"), zap.Any("got", oldObj))

return
}

if reflect.DeepEqual(newV.Spec, oldV.Spec) {
c.logger.Debug("TraTExcl update ignored, no spec change.",
zap.String("name", newV.Name),
zap.String("namespace", newV.Namespace),
)

return
}

versionNumber := atomic.AddInt64(&c.ruleVersionNumber, 1)

c.workqueue.Add(TratteriaExclOperation{Type: UPDATE, NewTraTExcl: newV, OldTraTExcl: oldV, VersionNumber: versionNumber})

c.logger.Info("Processing TraTExcl update operation.",
zap.String("name", oldV.Name),
zap.String("namespace", oldV.Namespace),
zap.Int64("version-number", versionNumber))
default:
c.logger.Error("Unknown type incountered for updated operation", zap.Any("oldObj", oldObj), zap.Any("newObj", newObj))
}
Expand All @@ -257,6 +309,14 @@ func (c *Controller) DeleteFunc(obj interface{}) {
zap.String("name", oldV.Name),
zap.String("namespace", oldV.Namespace),
zap.Int64("version-number", versionNumber))
case *tratteria1alpha1.TraTExclusion:
versionNumber := atomic.AddInt64(&c.ruleVersionNumber, 1)

c.workqueue.Add(TratteriaExclOperation{Type: DELETE, OldTraTExcl: oldV, VersionNumber: versionNumber})
c.logger.Info("Processing TraTExcl deletion operation.",
zap.String("name", oldV.Name),
zap.String("namespace", oldV.Namespace),
zap.Int64("version-number", versionNumber))
case cache.DeletedFinalStateUnknown:
switch t := oldV.Obj.(type) {
case *tratteria1alpha1.TraT:
Expand All @@ -267,6 +327,14 @@ func (c *Controller) DeleteFunc(obj interface{}) {
zap.String("name", t.Name),
zap.String("namespace", t.Namespace),
zap.Int64("version-number", versionNumber))
case *tratteria1alpha1.TraTExclusion:
versionNumber := atomic.AddInt64(&c.ruleVersionNumber, 1)

c.workqueue.Add(TratteriaExclOperation{Type: DELETE, OldTraTExcl: t, VersionNumber: versionNumber})
c.logger.Info("Processing TraTExcl deletion operation.",
zap.String("name", t.Name),
zap.String("namespace", t.Namespace),
zap.Int64("version-number", versionNumber))
default:
c.logger.Error("Tombstone contained unknown or unexpected type", zap.Any("obj", t))
}
Expand Down Expand Up @@ -346,7 +414,7 @@ func (c *Controller) syncHandler(ctx context.Context, obj any) error {
case DELETE:
return c.handleTraTDeletion(ctx, op.OldTraT, op.VersionNumber)
default:
return fmt.Errorf("unknown TraT operation type: %s", op.Type)
return fmt.Errorf("unknown %s operation on TraT", op.Type)
}
case TratteriaConfigOperation:
switch op.Type {
Expand All @@ -355,7 +423,18 @@ func (c *Controller) syncHandler(ctx context.Context, obj any) error {
case UPDATE:
return c.handleTratteriaConfigUpsert(ctx, op.NewTratteriaConfig, op.VersionNumber)
default:
return fmt.Errorf("unknown TratteriaConfig operation type: %s", op.Type)
return fmt.Errorf("unknown %s operation on TratteriaConfig", op.Type)
}
case TratteriaExclOperation:
switch op.Type {
case ADD:
return c.handleTraTExclUpsert(ctx, op.NewTraTExcl, op.VersionNumber)
case UPDATE:
return c.handleTraTExclUpsert(ctx, op.NewTraTExcl, op.VersionNumber)
case DELETE:
return c.handleTraTExclDeletion(ctx, op.OldTraTExcl, op.VersionNumber)
default:
return fmt.Errorf("unknown %s operation on TraTExcl", op.Type)
}
}

Expand All @@ -376,9 +455,15 @@ func (c *Controller) GetActiveVerificationRules(serviceName string, namespace st
return nil, 0, err
}

traTExclRules, err := c.GetActiveTraTExclRules(serviceName, namespace)
if err != nil {
return nil, 0, err
}

return &tratteria1alpha1.VerificationRules{
TratteriaConfigVerificationRule: tratteriaConfigVerificationRule,
TraTsVerificationRules: traTsVerificationRules,
TraTExclRule: traTExclRules,
},
activeRuleVersionNumber,
nil
Expand Down
Loading

0 comments on commit d8f92f3

Please sign in to comment.