Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay the unprocessed event from transport when the manager restarts #751

Merged
merged 9 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions manager/pkg/statussyncer/dispatcher/transport_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/go-logr/logr"

"github.com/stolostron/multicluster-global-hub/pkg/bundle/metadata"
"github.com/stolostron/multicluster-global-hub/pkg/conflator"
"github.com/stolostron/multicluster-global-hub/pkg/statistics"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
Expand Down Expand Up @@ -90,7 +89,7 @@ func (d *TransportDispatcher) dispatch(ctx context.Context) {
// d.conflationManager.Insert(receivedBundle, NewBundleMetadata(message.TopicPartition.Partition,
// message.TopicPartition.Offset))
d.log.V(2).Info("forward received bundle to conflation", "messageID", msgID)
d.conflationManager.Insert(receivedBundle, metadata.NewThresholdBundleStatus(3))
d.conflationManager.Insert(receivedBundle, message.BundleStatus)
}
}
}
8 changes: 7 additions & 1 deletion manager/pkg/statussyncer/syncers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) (db
// manage all Conflation Units
conflationManager := conflator.NewConflationManager(conflationReadyQueue, stats)

// add kafka offset to the database periodically
committer := conflator.NewKafkaConflationCommitter(conflationManager.GetTransportMetadatas)
if err := mgr.Add(committer); err != nil {
return nil, fmt.Errorf("failed to add DB worker pool: %w", err)
}

// database layer initialization - worker pool + connection pool
dbWorkerPool, err := workerpool.NewDBWorkerPool(stats)
if err != nil {
Expand Down Expand Up @@ -90,7 +96,7 @@ func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) (db
func getTransportDispatcher(mgr ctrl.Manager, conflationManager *conflator.ConflationManager,
managerConfig *config.ManagerConfig, stats *statistics.Statistics,
) (dbsyncer.BundleRegisterable, error) {
consumer, err := consumer.NewGenericConsumer(managerConfig.TransportConfig)
consumer, err := consumer.NewGenericConsumer(managerConfig.TransportConfig, consumer.WithDatabasePosition(true))
if err != nil {
return nil, fmt.Errorf("failed to initialize transport consumer: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions operator/pkg/controllers/hubofhubs/database/2.tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ CREATE TABLE IF NOT EXISTS history.local_compliance_job_log (
offsets int8,
error TEXT
);

CREATE TABLE IF NOT EXISTS status.transport (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle the upgrade case.

Copy link
Member Author

@yanmxa yanmxa Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clyang82 What do you mean the upgrade case?

The table hasn't been created before. So it will create the table if it database doesn't contain the table. And the operator will handle the upgrade.

-- transport name, it is the topic name for the kafka transport
name character varying(254) PRIMARY KEY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic name or managed hub cluster name?

Copy link
Member Author

@yanmxa yanmxa Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the topic name, cause the table only for transport

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoh=# select * from status.transport;
       name       |            payload             |         created_at         |         updated_at
------------------+--------------------------------+----------------------------+----------------------------
 status.kind-hub1 | {"offset": 16, "partition": 0} | 2024-01-04 02:37:05.56802  | 2024-01-09 07:27:48.741223
 status.kind-hub2 | {"offset": 16, "partition": 0} | 2024-01-04 02:37:05.56802  | 2024-01-09 07:27:48.741223

payload jsonb NOT NULL,
created_at timestamp without time zone DEFAULT now() NOT NULL,
updated_at timestamp without time zone DEFAULT now() NOT NULL
);
65 changes: 8 additions & 57 deletions pkg/bundle/metadata/bundle_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,63 +14,14 @@ type BundleStatus interface {
MarkAsUnprocessed()
}

// NewGenericBundleStatus returns a new instance of BaseBundleStatus
func NewGenericBundleStatus() *GenericBundleStatus {
return &GenericBundleStatus{
processed: false,
}
type TransportMetadata interface {
BundleStatus
// only support kafka for now
GetTransportMetadata() *TransportPosition
}

// GenericBundleStatus wraps the shared data/functionality that the different transport BundleMetadata implementations-
// can be based on.
type GenericBundleStatus struct {
processed bool
}

// MarkAsProcessed function that marks the metadata as processed.
func (genericBundleStatus *GenericBundleStatus) MarkAsProcessed() {
genericBundleStatus.processed = true
}

// MarkAsUnprocessed function that marks the metadata as processed.
func (genericBundleStatus *GenericBundleStatus) MarkAsUnprocessed() {
genericBundleStatus.processed = false
}

// Processed returns whether the bundle was processed or not.
func (genericBundleStatus *GenericBundleStatus) Processed() bool {
return genericBundleStatus.processed
}

// using threshold to indicate the bundle processed status
// the count means the retry times.
// 0 - the default initial value
// 1, 2, 3 ... - the retry times of current bundle has been failed processed
// -1 means it processed successfully
type thresholdBundleStatus struct {
maxRetry int
count int
}

// the retry times(max) when the bundle has been failed processed
func NewThresholdBundleStatus(max int) *thresholdBundleStatus {
return &thresholdBundleStatus{
maxRetry: max,
count: 0,
}
}

// MarkAsProcessed function that marks the metadata as processed.
func (s *thresholdBundleStatus) MarkAsProcessed() {
s.count = -1
}

// Processed returns whether the bundle was processed or not.
func (s *thresholdBundleStatus) Processed() bool {
return s.count == -1 || s.count >= s.maxRetry
}

// MarkAsUnprocessed function that marks the metadata as processed.
func (s *thresholdBundleStatus) MarkAsUnprocessed() {
s.count++
type TransportPosition struct {
Topic string `json:"-"`
Partition int32 `json:"partition"`
Offset int64 `json:"offset"`
}
29 changes: 29 additions & 0 deletions pkg/bundle/metadata/status/generic_bundle_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package status

// NewGenericBundleStatus returns a new instance of BaseBundleStatus
func NewGenericBundleStatus() *GenericBundleStatus {
return &GenericBundleStatus{
processed: false,
}
}

// GenericBundleStatus wraps the shared data/functionality that the different transport BundleMetadata implementations-
// can be based on.
type GenericBundleStatus struct {
processed bool
}

// MarkAsProcessed function that marks the metadata as processed.
func (genericBundleStatus *GenericBundleStatus) MarkAsProcessed() {
genericBundleStatus.processed = true
}

// MarkAsUnprocessed function that marks the metadata as processed.
func (genericBundleStatus *GenericBundleStatus) MarkAsUnprocessed() {
genericBundleStatus.processed = false
}

// Processed returns whether the bundle was processed or not.
func (genericBundleStatus *GenericBundleStatus) Processed() bool {
return genericBundleStatus.processed
}
87 changes: 87 additions & 0 deletions pkg/bundle/metadata/status/threshold_bundle_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package status

import (
"strconv"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/types"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/stolostron/multicluster-global-hub/pkg/bundle/metadata"
"github.com/stolostron/multicluster-global-hub/pkg/transport/kafka_confluent"
)

// using threshold to indicate the bundle processed status
// the count means the retry times.
// 0 - the default initial value
// 1, 2, 3 ... - the retry times of current bundle has been failed processed
// -1 means it processed successfully
type ThresholdBundleStatus struct {
maxRetry int
count int

// transport position
kafkaPosition *metadata.TransportPosition
}

// the retry times(max) when the bundle has been failed processed
func NewThresholdBundleStatus(max int, evt cloudevents.Event) *ThresholdBundleStatus {
log := ctrl.Log.WithName("threshold-bundle-status")

topic, err := types.ToString(evt.Extensions()[kafka_confluent.KafkaTopicKey])
if err != nil {
log.Info("failed to parse topic from event", "error", err)
}
partition, err := types.ToInteger(evt.Extensions()[kafka_confluent.KafkaPartitionKey])
if err != nil {
log.Info("failed to parse partition from event", "error", err)
}

offsetStr, ok := evt.Extensions()[kafka_confluent.KafkaOffsetKey].(string)
if !ok {
log.Info("failed to get offset string from event", "offset", evt.Extensions()[kafka_confluent.KafkaOffsetKey])
}

offset, err := strconv.ParseInt(offsetStr, 10, 64)
if !ok {
log.Info("failed to parse offset from event", "offset", offsetStr)
}

return &ThresholdBundleStatus{
maxRetry: max,
count: 0,

kafkaPosition: &metadata.TransportPosition{
Topic: topic,
Partition: partition,
Offset: offset,
},
}
}

func NewThresholdBundleStatusFromPosition(max int, pos *metadata.TransportPosition) *ThresholdBundleStatus {
return &ThresholdBundleStatus{
maxRetry: max,
count: 0,
kafkaPosition: pos,
}
}

// MarkAsProcessed function that marks the metadata as processed.
func (s *ThresholdBundleStatus) MarkAsProcessed() {
s.count = -1
}

// Processed returns whether the bundle was processed or not.
func (s *ThresholdBundleStatus) Processed() bool {
return s.count == -1 || s.count >= s.maxRetry
}

// MarkAsUnprocessed function that marks the metadata as processed.
func (s *ThresholdBundleStatus) MarkAsUnprocessed() {
s.count++
}

func (s *ThresholdBundleStatus) GetTransportMetadata() *metadata.TransportPosition {
return s.kafkaPosition
}
151 changes: 151 additions & 0 deletions pkg/conflator/conflation_committer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package conflator

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/go-logr/logr"
"gorm.io/gorm/clause"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/stolostron/multicluster-global-hub/pkg/bundle/metadata"
"github.com/stolostron/multicluster-global-hub/pkg/database"
"github.com/stolostron/multicluster-global-hub/pkg/database/models"
)

const KafkaPartitionDelimiter = "@"

func positionKey(topic string, partition int32) string {
return fmt.Sprintf("%s@%d", topic, partition)
}

type ConflationCommitter struct {
log logr.Logger
transportMetadatasFunc metadata.GetBundleStatusesFunc
committedPositions map[string]int64
}

func NewKafkaConflationCommitter(getTransportMetadatasFunc metadata.GetBundleStatusesFunc) *ConflationCommitter {
return &ConflationCommitter{
log: ctrl.Log.WithName("kafka-conflation-committer"),
transportMetadatasFunc: getTransportMetadatasFunc,
committedPositions: map[string]int64{},
}
}

func (k *ConflationCommitter) Start(ctx context.Context) error {
go func() {
ticker := time.NewTicker(time.Second * 5)

defer ticker.Stop()
for {
select {
case <-ticker.C: // wait for next time interval
err := k.commit()
if err != nil {
k.log.Info("failed to commit offset", "error", err)
}
// ticker.Reset()
case <-ctx.Done():
k.log.Info("context canceled, exiting committer...")
return
}
}
}()
return nil
}

func (k *ConflationCommitter) commit() error {
// get metadata (both pending and processed)
transportMetadatas := k.transportMetadatasFunc()

transPositions := metadataToCommit(transportMetadatas)

databaseTransports := []models.Transport{}
for key, transPosition := range transPositions {
// skip request if already committed this offset
committedOffset, found := k.committedPositions[key]
if found && committedOffset >= int64(transPosition.Offset) {
continue
}

k.log.Info("commit offset to database", "topic@partition", key, "offset", transPosition.Offset)
payload, err := json.Marshal(metadata.TransportPosition{
Topic: transPosition.Topic,
Partition: transPosition.Partition,
Offset: int64(transPosition.Offset),
})
if err != nil {
return err
}
databaseTransports = append(databaseTransports, models.Transport{
Name: transPosition.Topic,
Payload: payload,
})
k.committedPositions[key] = int64(transPosition.Offset)
}

db := database.GetGorm()
if len(databaseTransports) > 0 {
err := db.Clauses(clause.OnConflict{
UpdateAll: true,
}).CreateInBatches(databaseTransports, 100).Error
if err != nil {
return err
}
}
return nil
}

func metadataToCommit(metadataArray []metadata.BundleStatus) map[string]*metadata.TransportPosition {
// extract the lowest per partition in the pending bundles, the highest per partition in the processed bundles
pendingLowestMetadataMap := make(map[string]*metadata.TransportPosition)
processedHighestMetadataMap := make(map[string]*metadata.TransportPosition)

for _, transportMetadata := range metadataArray {
bundleStatus, ok := transportMetadata.(metadata.TransportMetadata)
if !ok {
continue // shouldn't happen
}

metadata := bundleStatus.GetTransportMetadata()
key := positionKey(metadata.Topic, metadata.Partition)

if !bundleStatus.Processed() {
// this belongs to a pending bundle, update the lowest-offsets-map
lowestMetadata, found := pendingLowestMetadataMap[key]
if found && metadata.Offset >= lowestMetadata.Offset {
continue // offset is not the lowest in partition, skip
}

pendingLowestMetadataMap[key] = metadata
} else {
// this belongs to a processed bundle, update the highest-offsets-map
highestMetadata, found := processedHighestMetadataMap[key]
if found && metadata.Offset <= highestMetadata.Offset {
continue // offset is not the highest in partition, skip
}

processedHighestMetadataMap[key] = metadata
}
}

// increment processed offsets so they are not re-read on kafka consumer restart
for key := range processedHighestMetadataMap {
processedHighestMetadataMap[key] = &metadata.TransportPosition{
Topic: processedHighestMetadataMap[key].Topic,
Partition: processedHighestMetadataMap[key].Partition,
Offset: processedHighestMetadataMap[key].Offset + 1,
}
}

// patch the processed offsets map with that of the pending ones, so that if a topic@partition
// has both types, the pending bundle gains priority (overwrites).
for key, metadata := range pendingLowestMetadataMap {
processedHighestMetadataMap[key] = metadata
}

return processedHighestMetadataMap
}
Loading