Skip to content

Commit

Permalink
add radix tree to memposting for prefix search
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jul 25, 2024
1 parent 71c90c7 commit 30b3cdc
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 39 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/KimMachineGun/automemlimit v0.6.1
github.com/alecthomas/kingpin/v2 v2.4.0
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30
github.com/armon/go-radix v1.0.0
github.com/aws/aws-sdk-go v1.54.19
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
github.com/cespare/xxhash/v2 v2.3.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
Expand Down
6 changes: 3 additions & 3 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type IndexReader interface {

// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
// If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool, prefix string) index.Postings

// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
Expand Down Expand Up @@ -522,8 +522,8 @@ func (r blockIndexReader) Postings(ctx context.Context, name string, values ...s
return p, nil
}

func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
return r.ir.PostingsForLabelMatching(ctx, name, match)
func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool, prefix string) index.Postings {
return r.ir.PostingsForLabelMatching(ctx, name, match, prefix)
}

func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
Expand Down
4 changes: 2 additions & 2 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (h *headIndexReader) Postings(ctx context.Context, name string, values ...s
}
}

func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
return h.head.postings.PostingsForLabelMatching(ctx, name, match)
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool, prefix string) index.Postings {
return h.head.postings.PostingsForLabelMatching(ctx, name, match, prefix)
}

func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,7 +1769,7 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
return Merge(ctx, res...), nil
}

func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool, _ string) Postings {
if r.version == FormatV1 {
return r.postingsForLabelMatchingV1(ctx, name, match)
}
Expand Down
75 changes: 47 additions & 28 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"

"github.com/armon/go-radix"
"github.com/bboreham/go-loser"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -55,14 +56,19 @@ var ensureOrderBatchPool = sync.Pool{
// unordered batch fills on startup.
type MemPostings struct {
mtx sync.RWMutex
m map[string]map[string][]storage.SeriesRef
m map[string]postingEntry
ordered bool
}

type postingEntry struct {
values map[string][]storage.SeriesRef
trie *radix.Tree
}

// NewMemPostings returns a memPostings that's ready for reads and writes.
func NewMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
m: make(map[string]postingEntry, 512),
ordered: true,
}
}
Expand All @@ -71,7 +77,7 @@ func NewMemPostings() *MemPostings {
// until EnsureOrder() was called once.
func NewUnorderedMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
m: make(map[string]postingEntry, 512),
ordered: false,
}
}
Expand All @@ -84,7 +90,7 @@ func (p *MemPostings) Symbols() StringIter {
symbols := make(map[string]struct{}, 512)
for n, e := range p.m {
symbols[n] = struct{}{}
for v := range e {
for v := range e.values {
symbols[v] = struct{}{}
}
}
Expand All @@ -105,7 +111,7 @@ func (p *MemPostings) SortedKeys() []labels.Label {
keys := make([]labels.Label, 0, len(p.m))

for n, e := range p.m {
for v := range e {
for v := range e.values {
keys = append(keys, labels.Label{Name: n, Value: v})
}
}
Expand Down Expand Up @@ -146,8 +152,8 @@ func (p *MemPostings) LabelValues(_ context.Context, name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

values := make([]string, 0, len(p.m[name]))
for v := range p.m[name] {
values := make([]string, 0, len(p.m[name].values))
for v := range p.m[name].values {
values = append(values, v)
}
return values
Expand Down Expand Up @@ -182,10 +188,10 @@ func (p *MemPostings) Stats(label string, limit int) *PostingsStats {
if n == "" {
continue
}
labels.push(Stat{Name: n, Count: uint64(len(e))})
numLabelPairs += len(e)
labels.push(Stat{Name: n, Count: uint64(len(e.values))})
numLabelPairs += len(e.values)
size = 0
for name, values := range e {
for name, values := range e.values {
if n == label {
metrics.push(Stat{Name: name, Count: uint64(len(values))})
}
Expand All @@ -212,8 +218,8 @@ func (p *MemPostings) Get(name, value string) Postings {
var lp []storage.SeriesRef
p.mtx.RLock()
l := p.m[name]
if l != nil {
lp = l[value]
if l.values != nil {
lp = l.values[value]
}
p.mtx.RUnlock()

Expand Down Expand Up @@ -266,7 +272,7 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {

nextJob := ensureOrderBatchPool.Get().(*[][]storage.SeriesRef)
for _, e := range p.m {
for _, l := range e {
for _, l := range e.values {
*nextJob = append(*nextJob, l)

if len(*nextJob) >= ensureOrderBatchSize {
Expand Down Expand Up @@ -294,20 +300,22 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
defer p.mtx.Unlock()

process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
orig := p.m[l.Name].values[l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
for _, id := range orig {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[l.Name][l.Value] = repl
p.m[l.Name].values[l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
delete(p.m[l.Name].values, l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
if len(p.m[l.Name].values) == 0 {
delete(p.m, l.Name)
} else {
p.m[l.Name].trie.Delete(l.Value)
}
}
}
Expand All @@ -324,7 +332,7 @@ func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
defer p.mtx.RUnlock()

for n, e := range p.m {
for v, p := range e {
for v, p := range e.values {
if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil {
return err
}
Expand All @@ -348,11 +356,15 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {
func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
nm, ok := p.m[l.Name]
if !ok {
nm = map[string][]storage.SeriesRef{}
nm = postingEntry{
values: map[string][]storage.SeriesRef{},
trie: radix.New(),
}
p.m[l.Name] = nm
}
list := append(nm[l.Value], id)
nm[l.Value] = list
list := append(nm.values[l.Value], id)
nm.values[l.Value] = list
nm.trie.Insert(l.Value, struct{}{})

if !p.ordered {
return
Expand All @@ -369,13 +381,13 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
}
}

func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool, prefix string) Postings {
// We'll copy the values into a slice and then match over that,
// this way we don't need to hold the mutex while we're matching,
// which can be slow (seconds) if the match function is a huge regex.
// Holding this lock prevents new series from being added (slows down the write path)
// and blocks the compaction process.
vals := p.labelValues(name)
vals := p.labelValues(name, prefix)
for i, count := 0, 1; i < len(vals); count++ {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return ErrPostings(ctx.Err())
Expand All @@ -401,7 +413,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
p.mtx.RLock()
e := p.m[name]
for _, v := range vals {
if refs, ok := e[v]; ok {
if refs, ok := e.values[v]; ok {
// Some of the values may have been garbage-collected in the meantime this is fine, we'll just skip them.
// If we didn't let the mutex go, we'd have these postings here, but they would be pointing nowhere
// because there would be a `MemPostings.Delete()` call waiting for the lock to delete these labels,
Expand All @@ -417,17 +429,24 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,

// labelValues returns a slice of label values for the given label name.
// It will take the read lock.
func (p *MemPostings) labelValues(name string) []string {
func (p *MemPostings) labelValues(name string, prefix string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

e := p.m[name]
if len(e) == 0 {
if len(e.values) == 0 {
return nil
}

vals := make([]string, 0, len(e))
for v, srs := range e {
vals := make([]string, 0, len(e.values))
if len(prefix) > 0 {
e.trie.WalkPrefix(prefix, func(s string, _ interface{}) bool {
vals = append(vals, s)
return false
})
return vals
}
for v, srs := range e.values {
if len(srs) > 0 {
vals = append(vals, v)
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string,
return index.NewListPostings(ir.ch.postings), nil
}

func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool, string) index.Postings {
return index.ErrPostings(errors.New("not supported"))
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func postingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher)
}
}

it := ix.PostingsForLabelMatching(ctx, m.Name, m.Matches)
it := ix.PostingsForLabelMatching(ctx, m.Name, m.Matches, m.Prefix())
return it, it.Err()
}

Expand Down
6 changes: 3 additions & 3 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2330,7 +2330,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
return index.NewListPostings(ep)
}

func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool, prefix string) index.Postings {
var res []index.Postings
for l, srs := range m.postings {
if l.Name == name && match(l.Value) {
Expand Down Expand Up @@ -3259,7 +3259,7 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str
return []string{}, nil
}

func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func(string) bool, string) index.Postings {
return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called"))
}

Expand Down Expand Up @@ -3700,7 +3700,7 @@ func (m mockReaderOfLabels) LabelNamesFor(context.Context, index.Postings) ([]st
panic("LabelNamesFor called")
}

func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, func(string) bool, string) index.Postings {
panic("PostingsForLabelMatching called")
}

Expand Down

0 comments on commit 30b3cdc

Please sign in to comment.