Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signficantly improve inhibitor performance via new cache datastructure #4134

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 98 additions & 30 deletions inhibit/inhibit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
)

Expand Down Expand Up @@ -72,9 +71,7 @@ func (ih *Inhibitor) run(ctx context.Context) {
// Update the inhibition rules' cache.
for _, r := range ih.rules {
if r.SourceMatchers.Matches(a.Labels) {
if err := r.scache.Set(a); err != nil {
ih.logger.Error("error on set alert", "err", err)
}
r.set(a)
}
}
}
Expand All @@ -93,8 +90,27 @@ func (ih *Inhibitor) Run() {
ih.mtx.Unlock()
runCtx, runCancel := context.WithCancel(ctx)

for _, rule := range ih.rules {
go rule.scache.Run(runCtx, 15*time.Minute)
for _, r := range ih.rules {
go func(r *InhibitRule) {
ticker := time.NewTicker(15 * time.Minute)
select {
case <-runCtx.Done():
return
case <-ticker.C:
r.mtx.Lock()
for icacheKey, cacheEntry := range r.icache {
for fp, cachedAlert := range cacheEntry {
if cachedAlert.alert.Resolved() {
delete(cacheEntry, fp)
}
}
if len(cacheEntry) == 0 {
delete(r.icache, icacheKey)
}
}
r.mtx.Unlock()
}
}(r)
}

g.Add(func() error {
Expand Down Expand Up @@ -126,24 +142,40 @@ func (ih *Inhibitor) Stop() {
// interface.
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()
now := time.Now()

for _, r := range ih.rules {
if !r.TargetMatchers.Matches(lset) {
// If target side of rule doesn't match, we don't need to look any further.
continue
}
// If we are here, the target side matches. If the source side matches, too, we
// need to exclude inhibiting alerts for which the same is true.
if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
ih.marker.SetInhibited(fp, inhibitedByFP.String())
// we know that the target side matches, but we don't know if this alert
// is actually inhibited yet - let the InhibitRule figure that out.
if inhibiting, matches := r.findInhibitor(lset, now); matches {
ih.marker.SetInhibited(fp, inhibiting.String())
return true
}
}

ih.marker.SetInhibited(fp)

return false
}

type cachedAlert struct {
alert *types.Alert
matchesSourceAndTarget bool
}

func newCachedAlert(a *types.Alert, targetMatchers labels.Matchers) cachedAlert {
return cachedAlert{
alert: a,
matchesSourceAndTarget: targetMatchers.Matches(a.Labels),
}
}

type iCacheEntry map[model.Fingerprint]cachedAlert

// An InhibitRule specifies that a class of (source) alerts should inhibit
// notifications for another class of (target) alerts if all specified matching
// labels are equal between the two alerts. This may be used to inhibit alerts
Expand All @@ -161,7 +193,9 @@ type InhibitRule struct {
Equal map[model.LabelName]struct{}

// Cache of alerts matching source labels.
scache *store.Alerts
icache map[model.Fingerprint]iCacheEntry

mtx *sync.RWMutex
}

// NewInhibitRule returns a new InhibitRule based on a configuration definition.
Expand Down Expand Up @@ -221,30 +255,64 @@ func NewInhibitRule(cr config.InhibitRule) *InhibitRule {
SourceMatchers: sourcem,
TargetMatchers: targetm,
Equal: equal,
scache: store.NewAlerts(),
icache: make(map[model.Fingerprint]iCacheEntry),
mtx: &sync.RWMutex{},
}
}

// hasEqual checks whether the source cache contains alerts matching the equal
// labels for the given label set. If so, the fingerprint of one of those alerts
// is returned. If excludeTwoSidedMatch is true, alerts that match both the
// source and the target side of the rule are disregarded.
func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
Outer:
for _, a := range r.scache.List() {
// The cache might be stale and contain resolved alerts.
if a.Resolved() {
continue
}
for n := range r.Equal {
if a.Labels[n] != lset[n] {
continue Outer
func (r *InhibitRule) set(a *types.Alert) {
// these two operations are by far the most expensive part of the method
// since they don't require hilding the mutex, call them here as a tiny
// optimization
icacheKey := r.icacheKey(a.Labels)
fp := a.Fingerprint()

r.mtx.Lock()
defer r.mtx.Unlock()

cacheEntry, ok := r.icache[icacheKey]
if !ok {
cacheEntry = make(iCacheEntry)
r.icache[icacheKey] = cacheEntry
}

cacheEntry[fp] = newCachedAlert(a, r.TargetMatchers)
}

func (r *InhibitRule) icacheKey(lset model.LabelSet) model.Fingerprint {
equalLabels := model.LabelSet{}
for label := range r.Equal {
equalLabels[label] = lset[label]
}
return equalLabels.Fingerprint()
}

// findInhibitor determines if any alert inhibits an lset that matches the target
// matchers. The fingerprint of the first matching result is returned.
func (r *InhibitRule) findInhibitor(lset model.LabelSet, now time.Time) (model.Fingerprint, bool) {
r.mtx.RLock()
defer r.mtx.RUnlock()

var sourceMatchersEvaluated, lsetMatchesSource bool
if cacheEntry, ok := r.icache[r.icacheKey(lset)]; ok {
for fp, cachedAlert := range cacheEntry {
if cachedAlert.alert.ResolvedAt(now) {
continue
}

if cachedAlert.matchesSourceAndTarget {
if !sourceMatchersEvaluated {
lsetMatchesSource = r.SourceMatchers.Matches(lset)
sourceMatchersEvaluated = true
}
if lsetMatchesSource {
continue
}
}

return fp, true
}
if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {
continue Outer
}
return a.Fingerprint(), true
}

return model.Fingerprint(0), false
}
38 changes: 24 additions & 14 deletions inhibit/inhibit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
package inhibit

import (
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"

"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
)

Expand Down Expand Up @@ -122,18 +123,27 @@ func TestInhibitRuleHasEqual(t *testing.T) {

for _, c := range cases {
r := &InhibitRule{
Equal: map[model.LabelName]struct{}{},
scache: store.NewAlerts(),
Equal: map[model.LabelName]struct{}{},
mtx: &sync.RWMutex{},
TargetMatchers: make(labels.Matchers, 0),
SourceMatchers: make(labels.Matchers, 0),
icache: make(map[model.Fingerprint]iCacheEntry),
}
for _, ln := range c.equal {
r.Equal[ln] = struct{}{}
}
for _, v := range c.initial {
r.scache.Set(v)
r.set(v)
}

if _, have := r.hasEqual(c.input, false); have != c.result {
t.Errorf("Unexpected result %t, expected %t", have, c.result)
matcher, err := labels.NewMatcher(labels.MatchEqual, "notareallabel", "notarealvalue")
require.NoError(t, err)
r.SourceMatchers = append(r.SourceMatchers, matcher)

_, hasMatch := r.findInhibitor(c.input, time.Now())

if hasMatch != c.result {
t.Errorf("Unexpected result %t, expected %t", hasMatch, c.result)
}
}
}
Expand Down Expand Up @@ -172,10 +182,10 @@ func TestInhibitRuleMatches(t *testing.T) {
},
}

ih.rules[0].scache = store.NewAlerts()
ih.rules[0].scache.Set(sourceAlert1)
ih.rules[1].scache = store.NewAlerts()
ih.rules[1].scache.Set(sourceAlert2)
ih.rules[0].icache = make(map[model.Fingerprint]iCacheEntry)
ih.rules[0].set(sourceAlert1)
ih.rules[1].icache = make(map[model.Fingerprint]iCacheEntry)
ih.rules[1].set(sourceAlert2)

cases := []struct {
target model.LabelSet
Expand Down Expand Up @@ -268,10 +278,10 @@ func TestInhibitRuleMatchers(t *testing.T) {
},
}

ih.rules[0].scache = store.NewAlerts()
ih.rules[0].scache.Set(sourceAlert1)
ih.rules[1].scache = store.NewAlerts()
ih.rules[1].scache.Set(sourceAlert2)
ih.rules[0].icache = make(map[model.Fingerprint]iCacheEntry)
ih.rules[0].set(sourceAlert1)
ih.rules[1].icache = make(map[model.Fingerprint]iCacheEntry)
ih.rules[1].set(sourceAlert2)

cases := []struct {
target model.LabelSet
Expand Down
Loading