Skip to content

Commit

Permalink
fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
torywheelwright committed Apr 2, 2024
1 parent af79b01 commit 9476095
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,25 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts

var inChans []chan *Publication
var outChans []chan error
var sendFailedMetrics []prometheus.Counter
var sendSucceededMetrics []prometheus.Counter

defer func () {
for _, c := range(inChans) {
close(c)
}
}()

for i, client := range clients {
clientIdx := i
for clientIdx, client := range clients {
clientIdx := clientIdx
clientIdxStr := strconv.FormatInt(int64(clientIdx), 10)
client := client
inChan := make(chan *Publication)
inChans = append(inChans, inChan)
outChan := make(chan error)
outChans = append(outChans, outChan)
sendSucceededMetrics = append(sendSucceededMetrics, metricSentMessages.WithLabelValues("sent", clientIdxStr))
sendFailedMetrics = append(sendFailedMetrics, metricSentMessages.WithLabelValues("failed", clientIdxStr))

go func() {
defer close(outChan)
Expand Down Expand Up @@ -113,10 +118,9 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts

for clientIdx, outChan := range outChans {
err := <-outChan
clientIdxStr := strconv.FormatInt(int64(clientIdx), 10)

if err != nil {
metricSentMessages.WithLabelValues("failed", clientIdxStr).Inc()
sendFailedMetrics[clientIdx].Inc()
log.Log.Errorw(
"Permanent error while trying to publish message; giving up",
"clientIdx", clientIdx,
Expand All @@ -125,7 +129,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts

)
} else {
metricSentMessages.WithLabelValues("sent", clientIdxStr).Inc()
sendSucceededMetrics[clientIdx].Inc()
}
}

Expand Down

0 comments on commit 9476095

Please sign in to comment.