Skip to content

Commit

Permalink
Merge pull request #5 from pannoi/auto-reconciliation-and-termination…
Browse files Browse the repository at this point in the history
…-protection

Auto reconciliation and termination protection
  • Loading branch information
pannoi authored May 17, 2023
2 parents efbd38b + 2c64d7b commit 448e93c
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 20 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-schema
namespace: default
data:
schema: |
{
Expand All @@ -74,13 +75,16 @@ apiVersion: kafka-schema-operator.pannoi/v1beta1
kind: KafkaSchema
metadata:
name: kafka-schema
namespace: default
spec:
name: testing
schemaSerializer: string
autoReconciliation: true # true = autoUpdate schema, false = for update CR should be re-created (not set => false)
terminationProtection: true # true = don't delete resources on CR deletion, false = when CR deleted, deletes all resource: ConfigMap, Schema from registry (not set => false)
data:
configRef: kafka-schema # ConfigMap
format: avro # avro/protobuf/json
compatibility: # BACKWARD | BACKWARD_TRANSITIVE | FORWARD | FORWARD_TRANSITIVE | FULL | FULL_TRANSITIVE | NONE
```

> Resource should be located in same `namespace`
> Resources (`KafkaSchema` & `ConfigMap`) should be located in same `namespace`
8 changes: 5 additions & 3 deletions api/v1beta1/kafkaschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ type KafkaSchemaData struct {

// KafkaSchemaSpec defines the desired state of KafkaSchema
type KafkaSchemaSpec struct {
Name string `json:"name"`
SchemaSerializer string `json:"schemaSerializer"`
Data KafkaSchemaData `json:"data"`
Name string `json:"name"`
SchemaSerializer string `json:"schemaSerializer"`
AutoReconciliation bool `json:"autoReconciliation,omitempty"`
TerminationProtection bool `json:"terminationProtection,omitempty"`
Data KafkaSchemaData `json:"data"`
}

// KafkaSchemaStatus defines the observed state of KafkaSchema
Expand Down
109 changes: 97 additions & 12 deletions controllers/kafkaschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

kafkaschemaoperatorv1beta1 "kafka-schema-operator/api/v1beta1"
)

const schemaFinilizers = "kafka-schema-operator.pannoi/finalizer"

// KafkaSchemaReconciler reconciles a KafkaSchema object
type KafkaSchemaReconciler struct {
client.Client
Expand Down Expand Up @@ -76,9 +79,32 @@ func generateSchemaCompatibilityUrl(subject string) (string, error) {
return url.String(), nil
}

func generateSchemaDeletionUrl(subject string) (string, error) {
schemaRegistryHost := os.Getenv("SCHEMA_REGISTRY_HOST")
schemaRegistryPort := os.Getenv("SCHEMA_REGISTRY_PORT")
if len(schemaRegistryHost) == 0 || len(schemaRegistryPort) == 0 {
return "", er.New("schema registry or port is not set")
}
var url strings.Builder
url.WriteString("http://")
url.WriteString(schemaRegistryHost)
url.WriteString(":")
url.WriteString(schemaRegistryPort)
url.WriteString("/config/")
url.WriteString(subject)
url.WriteString("?permanent=true")

return url.String(), nil
}

func sendHttpRequest(ctx context.Context, url string, httpMethod string, payload string) error {
log := log.FromContext(ctx)
httpReq, _ := http.NewRequest(httpMethod, url, strings.NewReader(payload))
var httpReq *http.Request
if len(payload) == 0 {
httpReq, _ = http.NewRequest(httpMethod, url, nil)
} else {
httpReq, _ = http.NewRequest(httpMethod, url, strings.NewReader(payload))
}
httpReq.Header.Set("Content-Type", "application/vnd.schemaregistry.v1+json")
if len(os.Getenv("SCHEMA_REGISTRY_KEY")) > 0 || len(os.Getenv("SCHEMA_REGISTRY_SECRET")) > 0 {
httpReq.SetBasicAuth(os.Getenv("SCHEMA_REGISTRY_KEY"), os.Getenv("SCHEMA_REGISTRY_SECRET"))
Expand Down Expand Up @@ -113,41 +139,100 @@ func (r *KafkaSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get Schema resource")
return ctrl.Result{Requeue: true}, err
return ctrl.Result{}, err
}

reconcileResult := ctrl.Result{}
if schema.Spec.AutoReconciliation {
reconcileResult = ctrl.Result{Requeue: true}
} else {
reconcileResult = ctrl.Result{}
}

cfg := &corev1.ConfigMap{}
err = r.Get(ctx, types.NamespacedName{Name: schema.Spec.Data.ConfigRef, Namespace: schema.Namespace}, cfg)
if err != nil {
log.Error(err, "Failed to find ConfigMap: "+schema.Spec.Data.ConfigRef)
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

schemaKey := schema.Spec.Name + "-key"
schemaValue := schema.Spec.Name + "-value"

isKafkaSchemaMarkedDeleted := schema.GetDeletionTimestamp() != nil
if isKafkaSchemaMarkedDeleted {
if !schema.Spec.TerminationProtection {
controllerutil.RemoveFinalizer(schema, schemaFinilizers)
err = r.Update(ctx, schema)
if err != nil {
log.Error(err, "Failed to delete KafkaSchema from kubernetes: "+schema.Name)
return ctrl.Result{}, err
}
log.Info("KafkaSchema CR was deleted: " + schema.Name)
err = r.Delete(ctx, cfg)
if err != nil {
log.Error(err, "Failed to delete ConfigMap: "+schema.Spec.Data.ConfigRef)
return ctrl.Result{}, err
}
log.Info("ConfigMap was deleted: " + schema.Spec.Data.ConfigRef)
keyDeletionUrl, err := generateSchemaDeletionUrl(schemaKey)
if err != nil {
log.Error(err, "Cannot create deletion url")
return ctrl.Result{}, err
}
valueDeletionUrl, err := generateSchemaDeletionUrl(schemaValue)
if err != nil {
log.Error(err, "Cannot create deletion url")
return ctrl.Result{}, err
}
err = sendHttpRequest(ctx, keyDeletionUrl, "DELETE", "")
if err != nil {
log.Error(err, "Failed to delete schema key from registry: "+schemaKey)
return ctrl.Result{}, err
}
err = sendHttpRequest(ctx, valueDeletionUrl, "DELETE", "")
if err != nil {
log.Error(err, "Failed to delete schema value from registry: "+schemaValue)
return ctrl.Result{}, err
}
log.Info("Schema was removed from registry")
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}

if !controllerutil.ContainsFinalizer(schema, schemaFinilizers) {
controllerutil.AddFinalizer(schema, schemaFinilizers)
err = r.Update(ctx, schema)
if err != nil {
log.Info("Failed to update finilizers for CR: " + schema.Name)
return ctrl.Result{}, err
}
log.Info("Finilizers are set for CR: " + schema.Name)
}

keySchemaRegistryUrl, err := generateSchemaUrl(schemaKey)
if err != nil {
log.Error(err, "Cannot create registry url")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

valueSchemaRegistryUrl, err := generateSchemaUrl(schemaValue)
if err != nil {
log.Error(err, "Cannot create registry url")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

valueSchemaCompatibilityUrl, err := generateSchemaCompatibilityUrl(schemaValue)
if err != nil {
log.Error(err, "Cannot create schema compatibility url")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

keySchemaCompatibilityUrl, err := generateSchemaCompatibilityUrl(schemaKey)
if err != nil {
log.Error(err, "Cannot create schema compatibility url")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

var schemaKeyPayload strings.Builder
Expand All @@ -158,7 +243,7 @@ func (r *KafkaSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = sendHttpRequest(ctx, keySchemaRegistryUrl, "POST", schemaKeyPayload.String())
if err != nil {
log.Error(err, "Failed to update schema registry")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}
log.Info("Schema key was published: " + schemaKey)

Expand All @@ -181,7 +266,7 @@ func (r *KafkaSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = sendHttpRequest(ctx, valueSchemaRegistryUrl, "POST", schemaValuePayload.String())
if err != nil {
log.Error(err, "Failed to update schema registry")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

var schemaCompatibilityPayload strings.Builder
Expand All @@ -192,17 +277,17 @@ func (r *KafkaSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = sendHttpRequest(ctx, valueSchemaCompatibilityUrl, "PUT", schemaCompatibilityPayload.String())
if err != nil {
log.Error(err, "Failed to update schema compatibility for value")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

err = sendHttpRequest(ctx, keySchemaCompatibilityUrl, "PUT", schemaCompatibilityPayload.String())
if err != nil {
log.Error(err, "Failed to update schema compatibility for key")
return ctrl.Result{Requeue: true}, err
return reconcileResult, err
}

log.Info("Schema value was published: " + schemaValue)
return ctrl.Result{Requeue: true}, nil
return reconcileResult, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
21 changes: 18 additions & 3 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
# Changelog

## [0.1.1] - 2023-05-17

### Added
- `terminationProtection` (optional) to handle automatic assoiated resources if `false`
- `autoReconciliation` (optional) to handle not reconsuming CR if `false`
- Handle deletion and finilizers
- Dynamic Result.Requeue policy based on parameter

### Changed
- Updated docs
- Move CRD to `kubernetes/crd/` folder

### Fixed
- Method `sendHttp` accepts nil payload

## [0.1.0] - 2023-04-04

### Added
- Initial operator creation
- Provision Kafka Schema with Custom Resource and Configmap
- Add schema compatibility
- Initial operator creation
- Provision Kafka Schema with Custom Resource and Configmap
- Add schema compatibility

### Changed

Expand Down
4 changes: 4 additions & 0 deletions kubernetes/templates/crd.yaml → kubernetes/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ spec:
spec:
description: KafkaSchemaSpec defines the desired state of KafkaSchema
properties:
autoReconciliation:
type: boolean
data:
properties:
compatibility:
Expand All @@ -52,6 +54,8 @@ spec:
type: string
schemaSerializer:
type: string
terminationProtection:
type: boolean
required:
- data
- name
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ operator:
pullPolicy: IfNotPresent
replicas: 1
image: pannoi/kafka-schema-opereator
version: 0.1.0
version: 0.1.1

schemaRegistry:
host:
Expand Down

0 comments on commit 448e93c

Please sign in to comment.