-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka.go
265 lines (225 loc) · 7.79 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package kafka
import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/InVisionApp/go-health/v2"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/google/uuid"
)
const (
consumerGroupPrefix string = "health-check-"
defaultTopic string = "health-checks"
defaultPollTimeout time.Duration = time.Millisecond * 200
defaultCheckTimeout time.Duration = time.Second
defaultSkipConsumerTimeouts int = 0
)
const (
bootstrapServersKey string = "bootstrap.servers"
groupIdKey string = "group.id"
autoOffsetResetKey string = "auto.offset.reset"
)
// checkTimeoutError is a custom error type used to represent a timeout that
// occurs while executing the status check (while a message is being sent or consumed).
type checkTimeoutError struct {
message string
}
func (e *checkTimeoutError) Error() string {
return e.message
}
// hostnameProvider is a generic contract to get the hostname. We use the hostname
// to build the consumer group identifier.
type hostnameProvider func() (string, error)
// hnProvider is the default hostname provider.
var hnProvider hostnameProvider = os.Hostname
type ConsumerConfig map[string]any
type ProducerConfig map[string]any
// KafkaConfig is used for configuring the go-kafka check.
type KafkaConfig struct {
BootstrapServers string // coma separated list of kafka brokers
Topic string // topic to connect to (make sure it exists)
PollTimeout time.Duration // time spent fetching the data from the topic
CheckTimeout time.Duration // maximum time to wait for the check to complete
SkipConsumerTimeouts int // maximum number of check timeouts to skip at the beginning when consuming messages
ConsumerConfig ConsumerConfig // consumer configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
ProducerConfig ProducerConfig // producer configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
}
// Kafka implements the "ICheckable" interface.
type Kafka struct {
config *KafkaConfig
producer *kafka.Producer
consumer *kafka.Consumer
}
// Interface compliance verification.
var _ health.ICheckable = (*Kafka)(nil)
// NewKafka builds a go-kafka check initialized with the provided configuration.
func NewKafka(cfg KafkaConfig) (*Kafka, error) {
if err := validateKafkaConfig(&cfg); err != nil {
return nil, fmt.Errorf("invalid kafka config: %w", err)
}
cc := kafka.ConfigMap{
bootstrapServersKey: cfg.BootstrapServers,
groupIdKey: buildUniqueConsumerGroupId(),
autoOffsetResetKey: "latest",
}
for key, value := range cfg.ConsumerConfig {
if key != bootstrapServersKey && key != groupIdKey && key != autoOffsetResetKey {
cc[key] = value
}
}
c, err := kafka.NewConsumer(&cc)
if err != nil {
return nil, fmt.Errorf("failed to create kafka consumer: %w", err)
}
pc := kafka.ConfigMap{
bootstrapServersKey: cfg.BootstrapServers,
}
for key, value := range cfg.ProducerConfig {
if key != bootstrapServersKey {
pc[key] = value
}
}
p, err := kafka.NewProducer(&pc)
if err != nil {
return nil, fmt.Errorf("failed to create kafka producer: %w", err)
}
err = c.SubscribeTopics([]string{cfg.Topic}, nil)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to topic: %w", err)
}
k := &Kafka{
config: &cfg,
consumer: c,
producer: p,
}
return k, nil
}
// validateKafkaConfig validates the provided configuration ans sets defaults
// for unset properties.
func validateKafkaConfig(cfg *KafkaConfig) error {
if cfg.BootstrapServers == "" {
return errors.New("BootstrapServers property is mandatory")
}
if strings.TrimSpace(cfg.Topic) == "" {
cfg.Topic = defaultTopic
}
if cfg.PollTimeout <= 0 {
cfg.PollTimeout = defaultPollTimeout
}
if cfg.CheckTimeout <= 0 {
cfg.CheckTimeout = defaultCheckTimeout
}
if cfg.SkipConsumerTimeouts <= 0 {
cfg.SkipConsumerTimeouts = defaultSkipConsumerTimeouts
}
return nil
}
// buildUniqueConsumerGroupId builds a unique consumer group identifier to consume
// health messages independently from other intances (of the same application or other
// applications). This will create a lot of different consumer groups consuming from the
// same health topic that will be unused on every restart of our application so better to
// have a periodic process to cleanup consumer groups in the kafka cluster.
func buildUniqueConsumerGroupId() string {
hostname, err := hnProvider()
if err != nil {
hostname = "unknownhost"
}
timestamp := time.Now().UnixNano()
uniqueID := fmt.Sprintf("%s@@%s@@%d", consumerGroupPrefix, hostname, timestamp)
return uniqueID
}
// Status function is responsible for executing the Kafka health check. This
// process involves sending a random message to the configured health check topic
// and subsequently confirming the reception of this message by the consumer within
// the predefined timeout constraints.
func (k *Kafka) Status() (interface{}, error) {
checkStart := time.Now()
details := make(map[string]string)
ctx, cancel := context.WithTimeout(context.Background(), k.config.CheckTimeout)
defer cancel()
expectedMessage, err := k.sendMessage(ctx)
if err != nil {
// If the application starts with a producer error we don't want to skip
// next consumer iterations so we set the value to zero.
k.config.SkipConsumerTimeouts = 0
details["producer"] = err.Error()
return details, errors.New("error sending messages")
}
err = k.consumeMessage(ctx, expectedMessage)
if err != nil {
_, isCheckTimeoutError := err.(*checkTimeoutError)
if isCheckTimeoutError && k.config.SkipConsumerTimeouts > 0 {
details["info"] = fmt.Sprintf("skipped check timeout (%d remaining)", k.config.SkipConsumerTimeouts-1)
k.config.SkipConsumerTimeouts--
return details, nil
} else {
details["consumer"] = err.Error()
return details, errors.New("error receiving messages")
}
}
// Set the property to zero as soon as we complete a roundtrip successfully.
k.config.SkipConsumerTimeouts = 0
details["info"] = fmt.Sprintf("Check completed in %v", time.Since(checkStart))
return details, nil
}
// sendMessage sends a random message to the configured health check topic and waits
// for the delivery report. It returns the generated message and a possible error.
func (k *Kafka) sendMessage(ctx context.Context) (msg string, deliveryErr error) {
msg = uuid.New().String()
err := k.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &k.config.Topic, Partition: kafka.PartitionAny},
Value: []byte(msg),
}, nil)
if err != nil {
return "", err
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
select {
case <-ctx.Done():
deliveryErr = &checkTimeoutError{message: "check timeout while waiting for the message delivery report"}
wg.Done()
return
case e := <-k.producer.Events():
if m, ok := e.(*kafka.Message); ok {
deliveryErr = m.TopicPartition.Error
wg.Done()
return
}
}
}
}()
wg.Wait()
return
}
// consumeMessage starts a consuming loop to check for the expected message. It returns
// nil to indicate whether the expected message was received or an error in other case.
func (k *Kafka) consumeMessage(ctx context.Context, expectedMessage string) error {
for {
select {
// If the context deadline was already reached when producing this case
// will execute anyway at first place.
case <-ctx.Done():
return &checkTimeoutError{message: "check timeout while consuming messages"}
default:
ev := k.consumer.Poll(int(k.config.PollTimeout / time.Millisecond))
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
if expectedMessage == string(e.Value) {
return nil
}
case kafka.Error:
return e
}
}
}
}