-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replay the unprocessed event from transport when the manager restarts #751
Changes from all commits
cf999b4
1c852e9
9e96c07
de9aab9
754b23a
4af0861
775c487
cd1df4e
714e30d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,3 +114,11 @@ CREATE TABLE IF NOT EXISTS history.local_compliance_job_log ( | |
offsets int8, | ||
error TEXT | ||
); | ||
|
||
CREATE TABLE IF NOT EXISTS status.transport ( | ||
-- transport name, it is the topic name for the kafka transport | ||
name character varying(254) PRIMARY KEY, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. topic name or managed hub cluster name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the topic name, cause the table only for transport There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hoh=# select * from status.transport;
name | payload | created_at | updated_at
------------------+--------------------------------+----------------------------+----------------------------
status.kind-hub1 | {"offset": 16, "partition": 0} | 2024-01-04 02:37:05.56802 | 2024-01-09 07:27:48.741223
status.kind-hub2 | {"offset": 16, "partition": 0} | 2024-01-04 02:37:05.56802 | 2024-01-09 07:27:48.741223 |
||
payload jsonb NOT NULL, | ||
created_at timestamp without time zone DEFAULT now() NOT NULL, | ||
updated_at timestamp without time zone DEFAULT now() NOT NULL | ||
); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package status | ||
|
||
// NewGenericBundleStatus returns a new instance of BaseBundleStatus | ||
func NewGenericBundleStatus() *GenericBundleStatus { | ||
return &GenericBundleStatus{ | ||
processed: false, | ||
} | ||
} | ||
|
||
// GenericBundleStatus wraps the shared data/functionality that the different transport BundleMetadata implementations- | ||
// can be based on. | ||
type GenericBundleStatus struct { | ||
processed bool | ||
} | ||
|
||
// MarkAsProcessed function that marks the metadata as processed. | ||
func (genericBundleStatus *GenericBundleStatus) MarkAsProcessed() { | ||
genericBundleStatus.processed = true | ||
} | ||
|
||
// MarkAsUnprocessed function that marks the metadata as processed. | ||
func (genericBundleStatus *GenericBundleStatus) MarkAsUnprocessed() { | ||
genericBundleStatus.processed = false | ||
} | ||
|
||
// Processed returns whether the bundle was processed or not. | ||
func (genericBundleStatus *GenericBundleStatus) Processed() bool { | ||
return genericBundleStatus.processed | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package status | ||
|
||
import ( | ||
"strconv" | ||
|
||
cloudevents "github.com/cloudevents/sdk-go/v2" | ||
"github.com/cloudevents/sdk-go/v2/types" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
|
||
"github.com/stolostron/multicluster-global-hub/pkg/bundle/metadata" | ||
"github.com/stolostron/multicluster-global-hub/pkg/transport/kafka_confluent" | ||
) | ||
|
||
// using threshold to indicate the bundle processed status | ||
// the count means the retry times. | ||
// 0 - the default initial value | ||
// 1, 2, 3 ... - the retry times of current bundle has been failed processed | ||
// -1 means it processed successfully | ||
type ThresholdBundleStatus struct { | ||
maxRetry int | ||
count int | ||
|
||
// transport position | ||
kafkaPosition *metadata.TransportPosition | ||
} | ||
|
||
// the retry times(max) when the bundle has been failed processed | ||
func NewThresholdBundleStatus(max int, evt cloudevents.Event) *ThresholdBundleStatus { | ||
log := ctrl.Log.WithName("threshold-bundle-status") | ||
|
||
topic, err := types.ToString(evt.Extensions()[kafka_confluent.KafkaTopicKey]) | ||
if err != nil { | ||
log.Info("failed to parse topic from event", "error", err) | ||
} | ||
partition, err := types.ToInteger(evt.Extensions()[kafka_confluent.KafkaPartitionKey]) | ||
if err != nil { | ||
log.Info("failed to parse partition from event", "error", err) | ||
} | ||
|
||
offsetStr, ok := evt.Extensions()[kafka_confluent.KafkaOffsetKey].(string) | ||
if !ok { | ||
log.Info("failed to get offset string from event", "offset", evt.Extensions()[kafka_confluent.KafkaOffsetKey]) | ||
} | ||
|
||
offset, err := strconv.ParseInt(offsetStr, 10, 64) | ||
if !ok { | ||
log.Info("failed to parse offset from event", "offset", offsetStr) | ||
} | ||
|
||
return &ThresholdBundleStatus{ | ||
maxRetry: max, | ||
count: 0, | ||
|
||
kafkaPosition: &metadata.TransportPosition{ | ||
Topic: topic, | ||
Partition: partition, | ||
Offset: offset, | ||
}, | ||
} | ||
} | ||
|
||
func NewThresholdBundleStatusFromPosition(max int, pos *metadata.TransportPosition) *ThresholdBundleStatus { | ||
return &ThresholdBundleStatus{ | ||
maxRetry: max, | ||
count: 0, | ||
kafkaPosition: pos, | ||
} | ||
} | ||
|
||
// MarkAsProcessed function that marks the metadata as processed. | ||
func (s *ThresholdBundleStatus) MarkAsProcessed() { | ||
s.count = -1 | ||
} | ||
|
||
// Processed returns whether the bundle was processed or not. | ||
func (s *ThresholdBundleStatus) Processed() bool { | ||
return s.count == -1 || s.count >= s.maxRetry | ||
} | ||
|
||
// MarkAsUnprocessed function that marks the metadata as processed. | ||
func (s *ThresholdBundleStatus) MarkAsUnprocessed() { | ||
s.count++ | ||
} | ||
|
||
func (s *ThresholdBundleStatus) GetTransportMetadata() *metadata.TransportPosition { | ||
return s.kafkaPosition | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package conflator | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/go-logr/logr" | ||
"gorm.io/gorm/clause" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
|
||
"github.com/stolostron/multicluster-global-hub/pkg/bundle/metadata" | ||
"github.com/stolostron/multicluster-global-hub/pkg/database" | ||
"github.com/stolostron/multicluster-global-hub/pkg/database/models" | ||
) | ||
|
||
const KafkaPartitionDelimiter = "@" | ||
|
||
func positionKey(topic string, partition int32) string { | ||
return fmt.Sprintf("%s@%d", topic, partition) | ||
} | ||
|
||
type ConflationCommitter struct { | ||
log logr.Logger | ||
transportMetadatasFunc metadata.GetBundleStatusesFunc | ||
committedPositions map[string]int64 | ||
} | ||
|
||
func NewKafkaConflationCommitter(getTransportMetadatasFunc metadata.GetBundleStatusesFunc) *ConflationCommitter { | ||
return &ConflationCommitter{ | ||
log: ctrl.Log.WithName("kafka-conflation-committer"), | ||
transportMetadatasFunc: getTransportMetadatasFunc, | ||
committedPositions: map[string]int64{}, | ||
} | ||
} | ||
|
||
func (k *ConflationCommitter) Start(ctx context.Context) error { | ||
go func() { | ||
ticker := time.NewTicker(time.Second * 5) | ||
|
||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ticker.C: // wait for next time interval | ||
err := k.commit() | ||
if err != nil { | ||
k.log.Info("failed to commit offset", "error", err) | ||
} | ||
// ticker.Reset() | ||
case <-ctx.Done(): | ||
k.log.Info("context canceled, exiting committer...") | ||
return | ||
} | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
func (k *ConflationCommitter) commit() error { | ||
// get metadata (both pending and processed) | ||
transportMetadatas := k.transportMetadatasFunc() | ||
|
||
transPositions := metadataToCommit(transportMetadatas) | ||
|
||
databaseTransports := []models.Transport{} | ||
for key, transPosition := range transPositions { | ||
// skip request if already committed this offset | ||
committedOffset, found := k.committedPositions[key] | ||
if found && committedOffset >= int64(transPosition.Offset) { | ||
continue | ||
} | ||
|
||
k.log.Info("commit offset to database", "topic@partition", key, "offset", transPosition.Offset) | ||
payload, err := json.Marshal(metadata.TransportPosition{ | ||
Topic: transPosition.Topic, | ||
Partition: transPosition.Partition, | ||
Offset: int64(transPosition.Offset), | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
databaseTransports = append(databaseTransports, models.Transport{ | ||
Name: transPosition.Topic, | ||
Payload: payload, | ||
}) | ||
k.committedPositions[key] = int64(transPosition.Offset) | ||
} | ||
|
||
db := database.GetGorm() | ||
if len(databaseTransports) > 0 { | ||
err := db.Clauses(clause.OnConflict{ | ||
UpdateAll: true, | ||
}).CreateInBatches(databaseTransports, 100).Error | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func metadataToCommit(metadataArray []metadata.BundleStatus) map[string]*metadata.TransportPosition { | ||
// extract the lowest per partition in the pending bundles, the highest per partition in the processed bundles | ||
pendingLowestMetadataMap := make(map[string]*metadata.TransportPosition) | ||
processedHighestMetadataMap := make(map[string]*metadata.TransportPosition) | ||
|
||
for _, transportMetadata := range metadataArray { | ||
bundleStatus, ok := transportMetadata.(metadata.TransportMetadata) | ||
if !ok { | ||
continue // shouldn't happen | ||
} | ||
|
||
metadata := bundleStatus.GetTransportMetadata() | ||
key := positionKey(metadata.Topic, metadata.Partition) | ||
|
||
if !bundleStatus.Processed() { | ||
// this belongs to a pending bundle, update the lowest-offsets-map | ||
lowestMetadata, found := pendingLowestMetadataMap[key] | ||
if found && metadata.Offset >= lowestMetadata.Offset { | ||
continue // offset is not the lowest in partition, skip | ||
} | ||
|
||
pendingLowestMetadataMap[key] = metadata | ||
} else { | ||
// this belongs to a processed bundle, update the highest-offsets-map | ||
highestMetadata, found := processedHighestMetadataMap[key] | ||
if found && metadata.Offset <= highestMetadata.Offset { | ||
continue // offset is not the highest in partition, skip | ||
} | ||
|
||
processedHighestMetadataMap[key] = metadata | ||
} | ||
} | ||
|
||
// increment processed offsets so they are not re-read on kafka consumer restart | ||
for key := range processedHighestMetadataMap { | ||
processedHighestMetadataMap[key] = &metadata.TransportPosition{ | ||
Topic: processedHighestMetadataMap[key].Topic, | ||
Partition: processedHighestMetadataMap[key].Partition, | ||
Offset: processedHighestMetadataMap[key].Offset + 1, | ||
} | ||
} | ||
|
||
// patch the processed offsets map with that of the pending ones, so that if a topic@partition | ||
// has both types, the pending bundle gains priority (overwrites). | ||
for key, metadata := range pendingLowestMetadataMap { | ||
processedHighestMetadataMap[key] = metadata | ||
} | ||
|
||
return processedHighestMetadataMap | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle the upgrade case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clyang82 What do you mean the upgrade case?
The table hasn't been created before. So it will create the table if it database doesn't contain the table. And the operator will handle the upgrade.