Skip to content

Commit

Permalink
Merge pull request #1884 from tisnik/refactor-consumers
Browse files Browse the repository at this point in the history
Refactor consumers
  • Loading branch information
tisnik authored Nov 10, 2023
2 parents 834c1f7 + b3e14fb commit 3e07c3a
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 291 deletions.
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var (
consumerInstance *consumer.KafkaConsumer
consumerInstance *consumer.OCPRulesConsumer
consumerInstanceIsStarting, finishConsumerInstanceInitialization = context.WithCancel(context.Background())
)

Expand All @@ -40,7 +40,7 @@ func startConsumer(brokerConf broker.Configuration) error {

defer closeStorage(dbStorage)

consumerInstance, err = consumer.New(brokerConf, dbStorage)
consumerInstance, err = consumer.NewOCPRulesConsumer(brokerConf, dbStorage)
if err != nil {
log.Error().Err(err).Msg("Broker initialization error")
return err
Expand Down
4 changes: 2 additions & 2 deletions consumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

func benchmarkProcessingMessage(b *testing.B, s storage.Storage, messageProducer func() string) {
kafkaConsumer := &consumer.KafkaConsumer{
kafkaConsumer := &consumer.OCPRulesConsumer{
Storage: s,
}

Expand Down Expand Up @@ -149,7 +149,7 @@ func BenchmarkKafkaConsumer_ProcessMessage_RealMessages(b *testing.B) {
}
defer ira_helpers.MustCloseStorage(b, benchStorage)

kafkaConsumer := &consumer.KafkaConsumer{
kafkaConsumer := &consumer.OCPRulesConsumer{
Storage: benchStorage,
}

Expand Down
236 changes: 2 additions & 234 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package consumer contains interface for any consumer that is able to
// process messages. It also contains implementation of Kafka consumer.
//
// It is expected that consumed messages are generated by ccx-data-pipeline
// based on OCP rules framework. The report generated by the framework are
// enhanced with more context information taken from different sources, like
// the organization ID, account number, unique cluster name, and the
// LastChecked timestamp (taken from the incoming Kafka record containing the
// URL to the archive).
//
// It is also expected that consumed messages contains one INFO rule hit that
// contains cluster version. That rule hit is produced by special rule used
// only in external data pipeline:
// "version_info|CLUSTER_VERSION_INFO"
// Package consumer contains interface for any consumer that is able to process
// messages. It also contains implementation of various Kafka consumers.
//
// For more information please see:
// https://redhatinsights.github.io/insights-data-schemas/external-pipeline/ccx_data_pipeline.html
package consumer

import (
"context"

"github.com/Shopify/sarama"
"github.com/rs/zerolog/log"

"github.com/RedHatInsights/insights-results-aggregator/broker"
"github.com/RedHatInsights/insights-results-aggregator/producer"
"github.com/RedHatInsights/insights-results-aggregator/storage"
)

// Consumer represents any consumer of insights-rules messages
Expand All @@ -50,216 +31,3 @@ type Consumer interface {
Close() error
HandleMessage(msg *sarama.ConsumerMessage) error
}

// KafkaConsumer is an implementation of Consumer interface
// Example:
//
// kafkaConsumer, err := consumer.New(brokerCfg, storage)
//
// if err != nil {
// panic(err)
// }
//
// kafkaConsumer.Serve()
//
// err := kafkaConsumer.Stop()
//
// if err != nil {
// panic(err)
// }
type KafkaConsumer struct {
Configuration broker.Configuration
ConsumerGroup sarama.ConsumerGroup
Storage storage.Storage
numberOfSuccessfullyConsumedMessages uint64
numberOfErrorsConsumingMessages uint64
ready chan bool
cancel context.CancelFunc
payloadTrackerProducer *producer.PayloadTrackerProducer
deadLetterProducer *producer.DeadLetterProducer
}

// DefaultSaramaConfig is a config which will be used by default
// here you can use specific version of a protocol for example
// useful for testing
var DefaultSaramaConfig *sarama.Config

// New constructs new implementation of Consumer interface
func New(brokerCfg broker.Configuration, storage storage.Storage) (*KafkaConsumer, error) {
return NewWithSaramaConfig(brokerCfg, storage, DefaultSaramaConfig)
}

// NewWithSaramaConfig constructs new implementation of Consumer interface with custom sarama config
func NewWithSaramaConfig(
brokerCfg broker.Configuration,
storage storage.Storage,
saramaConfig *sarama.Config,
) (*KafkaConsumer, error) {
var err error

if saramaConfig == nil {
saramaConfig, err = broker.SaramaConfigFromBrokerConfig(brokerCfg)
if err != nil {
log.Error().Err(err).Msg("unable to create sarama configuration from current broker configuration")
return nil, err
}
}

log.Info().
Str("addr", brokerCfg.Address).
Str("group", brokerCfg.Group).
Msg("New consumer group")

consumerGroup, err := sarama.NewConsumerGroup([]string{brokerCfg.Address}, brokerCfg.Group, saramaConfig)
if err != nil {
log.Error().Err(err).Msg("Unable to create consumer group")
return nil, err
}
log.Info().Msg("Consumer group has been created")

log.Info().Msg("Constructing payload tracker producer")
payloadTrackerProducer, err := producer.NewPayloadTrackerProducer(brokerCfg)
if err != nil {
log.Error().Err(err).Msg("Unable to construct payload tracker producer")
return nil, err
}
if payloadTrackerProducer == nil {
log.Info().Msg("Payload tracker producer not configured")
} else {
log.Info().Msg("Payload tracker producer has been configured")
}

log.Info().Msg("Constructing DLQ producer")
deadLetterProducer, err := producer.NewDeadLetterProducer(brokerCfg)
if err != nil {
log.Error().Err(err).Msg("Unable to construct dead letter producer")
return nil, err
}
if deadLetterProducer == nil {
log.Info().Msg("Dead letter producer not configured")
} else {
log.Info().Msg("Dead letter producer has been configured")
}

consumer := &KafkaConsumer{
Configuration: brokerCfg,
ConsumerGroup: consumerGroup,
Storage: storage,
numberOfSuccessfullyConsumedMessages: 0,
numberOfErrorsConsumingMessages: 0,
ready: make(chan bool),
payloadTrackerProducer: payloadTrackerProducer,
deadLetterProducer: deadLetterProducer,
}

return consumer, nil
}

// Serve starts listening for messages and processing them. It blocks current thread.
func (consumer *KafkaConsumer) Serve() {
ctx, cancel := context.WithCancel(context.Background())
consumer.cancel = cancel

go func() {
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := consumer.ConsumerGroup.Consume(ctx, []string{consumer.Configuration.Topic}, consumer); err != nil {
log.Fatal().Err(err).Msg("unable to recreate kafka session")
}

// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}

log.Info().Msg("created new kafka session")

consumer.ready = make(chan bool)
}
}()

// Await till the consumer has been set up
log.Info().Msg("waiting for consumer to become ready")
<-consumer.ready
log.Info().Msg("finished waiting for consumer to become ready")

// Actual processing is done in goroutine created by sarama (see ConsumeClaim below)
log.Info().Msg("started serving consumer")
<-ctx.Done()
log.Info().Msg("context cancelled, exiting")

cancel()
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error {
log.Info().Msg("new session has been setup")
// Mark the consumer as ready
close(consumer.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
log.Info().Msg("new session has been finished")
return nil
}

// ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
log.Info().
Int64(offsetKey, claim.InitialOffset()).
Msg("starting messages loop")

for message := range claim.Messages() {
err := consumer.HandleMessage(message)
if err != nil {
// already handled in HandleMessage, just log
log.Error().Err(err).Msg("Problem while handling the message")
}
session.MarkMessage(message, "")
}

return nil
}

// Close method closes all resources used by consumer
func (consumer *KafkaConsumer) Close() error {
if consumer.cancel != nil {
consumer.cancel()
}

if consumer.ConsumerGroup != nil {
if err := consumer.ConsumerGroup.Close(); err != nil {
log.Error().Err(err).Msg("unable to close consumer group")
}
}

if consumer.payloadTrackerProducer != nil {
if err := consumer.payloadTrackerProducer.Close(); err != nil {
log.Error().Err(err).Msg("unable to close payload tracker Kafka producer")
}
}

if consumer.deadLetterProducer != nil {
if err := consumer.deadLetterProducer.Close(); err != nil {
log.Error().Err(err).Msg("unable to close dead letter Kafka producer")
}
}

return nil
}

// GetNumberOfSuccessfullyConsumedMessages returns number of consumed messages
// since creating KafkaConsumer obj
func (consumer *KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages() uint64 {
return consumer.numberOfSuccessfullyConsumedMessages
}

// GetNumberOfErrorsConsumingMessages returns number of errors during consuming messages
// since creating KafkaConsumer obj
func (consumer *KafkaConsumer) GetNumberOfErrorsConsumingMessages() uint64 {
return consumer.numberOfErrorsConsumingMessages
}
Loading

0 comments on commit 3e07c3a

Please sign in to comment.