Skip to content

Commit

Permalink
(#83) Add Heartbeats to streams
Browse files Browse the repository at this point in the history
Here we add the ability to configure heartbeat messages for streams.

By adding a new heartbeat section to the configuration file, and specifying
specific streams, stream-replicator will send a message with a unix timestamp as
the body at a predetermined interval. This can be used to monitor availability
when streams are mostly idle.

The heartbeat package will spawn a goroutine for every configured subject and
send a message on interval. The heartbeat publisher is leader aware and adds
relevant prometheus metrics.
  • Loading branch information
ploubser committed Mar 15, 2023
1 parent 2d5402f commit 0eae642
Show file tree
Hide file tree
Showing 8 changed files with 608 additions and 1 deletion.
13 changes: 13 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/choria-io/stream-replicator/advisor"
"github.com/choria-io/stream-replicator/heartbeat"
"github.com/choria-io/stream-replicator/idtrack"
"github.com/choria-io/tokens"
"github.com/nats-io/jsm.go"
Expand Down Expand Up @@ -449,6 +450,18 @@ func (c *cmd) replicateAction(_ *fisk.ParseContext) error {
}(s)
}

if cfg.HeartBeat != nil {
hb, err := heartbeat.New(cfg.HeartBeat, cfg.ReplicatorName, c.log)
if err != nil {
c.log.Errorf("Could not initialize heartbeat: %v", err)
} else {
err = hb.Run(ctx, wg)
if err != nil {
c.log.Errorf("Could not start heartbeat: %v", err)
}
}
}

wg.Wait()

return nil
Expand Down
54 changes: 54 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Config struct {
LogFile string `json:"logfile"`
// LogLevel is the logging level: debug, warn or info
LogLevel string `json:"loglevel"`
// Heartbeat defines monitoring heartbeats
HeartBeat *HeartBeat `json:"heartbeats"`
}

type Stream struct {
Expand Down Expand Up @@ -108,6 +110,32 @@ type Stream struct {
StateFile string `json:"-"`
}

type HeartBeat struct {
// LeaderElection indicates that this replicator is part of a group and will elect a leader to send hearbeats
LeaderElection bool `json:"leader_election"`
// Interval determines how often a heartbeat message will be sent
Interval string `json:"interval"`
// Headers are custom headers to add to the heartbeat message
Headers map[string]string `json:"headers"`
// Subjects are the subjects the heatbeat messages will be sent to
Subjects []Subject `json:"subjects"`
// TLS is TLS settings that would be used,
TLS TLS `json:"tls"`
// Choria is the Choria settings that would be used
Choria ChoriaConnection `json:"choria"`
// URL is the url of the nats broker
URL string `json:"url"`
}

type Subject struct {
// Name is the name of the subject
Name string `json:"subject"`
// Interval determines how often a heartbeat message will be sent
Interval string `json:"interval"`
// Headers are custom headers to add to the heartbeat message
Headers map[string]string `json:"headers"`
}

type Advisory struct {
// Subject is the NATS subject to publish messages too, a %s in the string will be replaced by the event type, %v with the value
Subject string `json:"subject"`
Expand Down Expand Up @@ -289,6 +317,32 @@ func (c *Config) Validate() (err error) {
}
}

if c.HeartBeat != nil {
if c.HeartBeat.URL == "" {
return fmt.Errorf("url is required with heartbeat")
}

_, err = util.ParseDurationString(c.HeartBeat.Interval)
if err != nil {
return fmt.Errorf("invalid interval: %v", err)
}

if len(c.HeartBeat.Subjects) == 0 {
return fmt.Errorf("heartbeat requires at least one subject")
}

for _, subject := range c.HeartBeat.Subjects {
if subject.Name == "" {
return fmt.Errorf("name is required with subject")
}

_, err = util.ParseDurationString(subject.Interval)
if err != nil {
return fmt.Errorf("invalid interval: %v", err)
}
}
}

return nil
}

Expand Down
25 changes: 25 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,5 +160,30 @@ var _ = Describe("Config", func() {
Expect(cfg.Validate()).To(MatchError("only one inspection mode can be set per stream"))

})

It("Should validate the required fields in heartbeat", func() {
cfg.HeartBeat = &HeartBeat{}
Expect(cfg.Validate()).To(MatchError("url is required with heartbeat"))

cfg.HeartBeat.URL = "nats://foo.com:4222"
Expect(cfg.Validate()).To(MatchError("heartbeat requires at least one subject"))

cfg.HeartBeat.Subjects = []Subject{
{},
}
Expect(cfg.Validate()).To(MatchError("name is required with subject"))

cfg.HeartBeat.Subjects = []Subject{
{
Name: "s.1",
},
}

cfg.HeartBeat.Interval = "1o"
Expect(cfg.Validate()).To(MatchError("invalid interval: invalid time unit o"))

cfg.HeartBeat.Interval = "1s"
Expect(cfg.Validate()).ToNot(HaveOccurred())
})
})
})
5 changes: 5 additions & 0 deletions election/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ func WithDebug(cb func(format string, a ...any)) Option {
func WithReplicator(r string) Option {
return func(o *options) { o.replicator = r }
}

// SkipTTLValidateForTests turns off Bucket TTL validation for testing
func SkipTTLValidateForTests() {
skipValidate = true
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/onsi/ginkgo/v2 v2.8.3
github.com/onsi/gomega v1.27.1
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/segmentio/ksuid v1.0.4
github.com/sirupsen/logrus v1.9.0
github.com/tidwall/gjson v1.14.4
Expand All @@ -35,7 +36,6 @@ require (
github.com/nats-io/jwt/v2 v2.3.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
Expand Down
226 changes: 226 additions & 0 deletions heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Copyright (c) 2022-2023, R.I. Pienaar and the Choria Project contributors
//
// SPDX-License-Identifier: Apache-2.0

// Package heartbeat defines the heartbeat system that starts up along side the stream replicator.
// If configured it will send a heartbeat messages to the configured subjects on a configurable interval.
package heartbeat

import (
"context"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/choria-io/stream-replicator/backoff"
"github.com/choria-io/stream-replicator/config"
"github.com/choria-io/stream-replicator/election"
"github.com/choria-io/stream-replicator/internal/util"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

const (
DefaultUpdateInterval = "10s"
OriginatorHeader = "Choria-SR-Originator"
SubjectHeader = "Choria-SR-Subject"
)

var (
enableBackoff = true
)

type HeartBeat struct {
url string
replicatorName string
electionName string
tls config.TLS
choria config.ChoriaConnection
leaderElection bool
interval string
headers map[string]string
subjects []*Subject
log *logrus.Entry
paused atomic.Bool
}

type Subject struct {
name string
interval time.Duration
headers map[string]string
}

// New creates a new instance of the Heartbeat struct
func New(hbcfg *config.HeartBeat, replicatorName string, log *logrus.Entry) (*HeartBeat, error) {
hb := &HeartBeat{
replicatorName: replicatorName,
electionName: fmt.Sprintf("%s_HB", replicatorName),
tls: hbcfg.TLS,
choria: hbcfg.Choria,
leaderElection: hbcfg.LeaderElection,
headers: hbcfg.Headers,
log: log,
url: hbcfg.URL,
}

if hb.headers == nil {
hb.headers = make(map[string]string)
}

hb.paused.Store(hb.leaderElection)

hb.interval = DefaultUpdateInterval
if hbcfg.Interval != "" {
hb.interval = hbcfg.Interval
}

for _, s := range hbcfg.Subjects {
var err error
sub := &Subject{name: s.Name}
if s.Interval == "" {
s.Interval = hb.interval
}

sub.interval, err = util.ParseDurationString(s.Interval)
if err != nil {
return nil, err
}

if s.Headers == nil {
s.Headers = make(map[string]string)
}
sub.headers = s.Headers

for k, v := range hb.headers {
if _, ok := sub.headers[k]; ok {
continue
}
sub.headers[k] = v
}
hb.subjects = append(hb.subjects, sub)
}

return hb, nil
}

// Run initializes a the jetstream connection and spawns a go routine for every configured subject
// that will publish a heartbeat message on the defined interval
func (hb *HeartBeat) Run(ctx context.Context, wg *sync.WaitGroup) error {
nc, err := util.ConnectNats(ctx, "subject-heartbeats", hb.url, &hb.tls, &hb.choria, false, hb.log)
if err != nil {
return err
}

if hb.leaderElection {
err = hb.setupElection(ctx, nc)
if err != nil {
hb.log.Errorf("Could not set up elections: %v", err)
return err
}
}

js, err := nc.JetStream()
if err != nil {
return fmt.Errorf("unable to create jetstream context: %v", err)
}

for _, subject := range hb.subjects {
wg.Add(1)
hbSubjects.WithLabelValues(hb.replicatorName, subject.name).Inc()
go heartBeatWorker(ctx, wg, subject, nc, js, hb.replicatorName, &hb.paused, hb.log.WithField("subject", subject.name))
}

return nil
}

func heartBeatWorker(ctx context.Context, wg *sync.WaitGroup, sub *Subject, nc *nats.Conn, js nats.JetStreamContext, replicatorName string, paused *atomic.Bool, log *logrus.Entry) {
defer wg.Done()

hostname, err := os.Hostname()
if err != nil {
log.Warn("Unable to determine hostname. Publishing heartbeats with empty 'originator' header.")
}

msg := nats.NewMsg(sub.name)
for k, v := range sub.headers {
msg.Header.Add(k, v)
}

if hostname != "" {
msg.Header.Add(OriginatorHeader, hostname)
}

msg.Header.Add(SubjectHeader, sub.name)

ticker := time.NewTicker(sub.interval)
if enableBackoff {
ticker.Reset(1 * time.Hour)
time.AfterFunc(backoff.FiveSec.Duration(10), func() { ticker.Reset(sub.interval) })
}

for {
select {
case <-ticker.C:
if paused.Load() {
log.Debug("Not sending heartbeat when paused")
continue
}
msg.Data = []byte(strconv.Itoa(int(time.Now().Unix())))

timer := hbPublishTime.WithLabelValues(replicatorName, sub.name)
obs := prometheus.NewTimer(timer)
log.Debugf("Sending heartbeat message")
_, err := js.PublishMsg(msg, nats.AckWait(2*time.Second))
obs.ObserveDuration()

// Check the function PublishMSG
if err != nil {
hbPublishedCtrErr.WithLabelValues(replicatorName, sub.name).Inc()
log.Errorf("Unable to publish message to subject: %v", err)
}

hbPublishedCtr.WithLabelValues(replicatorName, sub.name).Inc()

case <-ctx.Done():
return
}
}
}

func (hb *HeartBeat) setupElection(ctx context.Context, nc *nats.Conn) error {
js, err := nc.JetStream()
if err != nil {
return err
}

kv, err := js.KeyValue("CHORIA_LEADER_ELECTION")
if err != nil {
return err
}

win := func() {
hb.log.Warnf("%s became the leader", hb.replicatorName)
hb.paused.Store(false)
hbPaused.WithLabelValues(hb.replicatorName).Set(1.0)
}

lost := func() {
hb.log.Warnf("%s lost the leadership", hb.replicatorName)
hb.paused.Store(true)
hbPaused.WithLabelValues(hb.replicatorName).Set(0)
}

e, err := election.NewElection(hb.electionName, "heartbeat", kv, election.WithBackoff(backoff.FiveSec), election.OnWon(win), election.OnLost(lost))
if err != nil {
return err
}

go e.Start(ctx)

hb.log.Infof("Set up leader election 'heartbeat' using candidate name %s", hb.electionName)
return nil
}
Loading

0 comments on commit 0eae642

Please sign in to comment.