From 6204558fbeed8bd082cb7a2936b42b4d02eeadcc Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 7 Nov 2024 17:53:44 +0900 Subject: [PATCH] Add prealloc timeseries v2 Signed-off-by: SungJin1212 --- pkg/cortexpb/slicesPool.go | 6 +- pkg/cortexpb/slicesPool_test.go | 12 +-- pkg/cortexpb/timeseries.go | 6 +- pkg/cortexpbv2/timeseriesv2.go | 136 ++++++++++++++++++++++++++++ pkg/cortexpbv2/timeseriesv2_test.go | 111 +++++++++++++++++++++++ 5 files changed, 259 insertions(+), 12 deletions(-) create mode 100644 pkg/cortexpbv2/timeseriesv2.go create mode 100644 pkg/cortexpbv2/timeseriesv2_test.go diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go index e28d51d4f23..eaace237ce3 100644 --- a/pkg/cortexpb/slicesPool.go +++ b/pkg/cortexpb/slicesPool.go @@ -13,7 +13,7 @@ type byteSlicePools struct { pools []sync.Pool } -func newSlicePool(pools int) *byteSlicePools { +func NewSlicePool(pools int) *byteSlicePools { sp := byteSlicePools{} sp.init(pools) return &sp @@ -32,7 +32,7 @@ func (sp *byteSlicePools) init(pools int) { } } -func (sp *byteSlicePools) getSlice(size int) *[]byte { +func (sp *byteSlicePools) GetSlice(size int) *[]byte { index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower if index >= len(sp.pools) { @@ -50,7 +50,7 @@ func (sp *byteSlicePools) getSlice(size int) *[]byte { return s } -func (sp *byteSlicePools) reuseSlice(s *[]byte) { +func (sp *byteSlicePools) ReuseSlice(s *[]byte) { index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower if index >= len(sp.pools) || index < 0 { diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go index 9bc56cdec3f..dd35beb33a0 100644 --- a/pkg/cortexpb/slicesPool_test.go +++ b/pkg/cortexpb/slicesPool_test.go @@ -9,22 +9,22 @@ import ( ) func TestFuzzyByteSlicePools(t *testing.T) { - sut := newSlicePool(20) + sut := NewSlicePool(20) maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1)) for i := 0; i < 1000; i++ { size := rand.Int() % maxByteSize - s := sut.getSlice(size) + s := sut.GetSlice(size) assert.Equal(t, len(*s), size) - sut.reuseSlice(s) + sut.ReuseSlice(s) } } func TestReturnSliceSmallerThanMin(t *testing.T) { - sut := newSlicePool(20) + sut := NewSlicePool(20) size := 3 buff := make([]byte, 0, size) - sut.reuseSlice(&buff) - buff2 := sut.getSlice(size * 2) + sut.ReuseSlice(&buff) + buff2 := sut.GetSlice(size * 2) assert.Equal(t, len(*buff2), size*2) } diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index db7354ffe45..b880739ae2b 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -47,7 +47,7 @@ var ( } }, } - bytePool = newSlicePool(20) + bytePool = NewSlicePool(20) ) // PreallocConfig configures how structures will be preallocated to optimise @@ -86,7 +86,7 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { size := p.Size() - p.data = bytePool.getSlice(size) + p.data = bytePool.GetSlice(size) dAtA = *p.data n, err := p.MarshalToSizedBuffer(dAtA[:size]) if err != nil { @@ -97,7 +97,7 @@ func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { func ReuseWriteRequest(req *PreallocWriteRequest) { if req.data != nil { - bytePool.reuseSlice(req.data) + bytePool.ReuseSlice(req.data) req.data = nil } req.Source = 0 diff --git a/pkg/cortexpbv2/timeseriesv2.go b/pkg/cortexpbv2/timeseriesv2.go new file mode 100644 index 00000000000..684e4bfac0f --- /dev/null +++ b/pkg/cortexpbv2/timeseriesv2.go @@ -0,0 +1,136 @@ +package cortexpbv2 + +import ( + "sync" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +var ( + expectedTimeseries = 100 + expectedLabels = 20 + expectedSymbols = 20 + expectedSamplesPerSeries = 10 + expectedExemplarsPerSeries = 1 + expectedHistogramsPerSeries = 1 + + slicePool = sync.Pool{ + New: func() interface{} { + return make([]PreallocTimeseriesV2, 0, expectedTimeseries) + }, + } + + timeSeriesPool = sync.Pool{ + New: func() interface{} { + return &TimeSeries{ + LabelsRefs: make([]uint32, 0, expectedLabels), + Samples: make([]Sample, 0, expectedSamplesPerSeries), + Histograms: make([]Histogram, 0, expectedHistogramsPerSeries), + Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries), + Metadata: Metadata{}, + } + }, + } + + writeRequestPool = sync.Pool{ + New: func() interface{} { + return &PreallocWriteRequestV2{ + WriteRequest: WriteRequest{ + Symbols: make([]string, 0, expectedSymbols), + }, + } + }, + } + bytePool = cortexpb.NewSlicePool(20) +) + +// PreallocWriteRequestV2 is a WriteRequest which preallocs slices on Unmarshal. +type PreallocWriteRequestV2 struct { + WriteRequest + data *[]byte +} + +// Unmarshal implements proto.Message. +func (p *PreallocWriteRequestV2) Unmarshal(dAtA []byte) error { + p.Timeseries = PreallocTimeseriesV2SliceFromPool() + return p.WriteRequest.Unmarshal(dAtA) +} + +func (p *PreallocWriteRequestV2) Marshal() (dAtA []byte, err error) { + size := p.Size() + p.data = bytePool.GetSlice(size) + dAtA = *p.data + n, err := p.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +// PreallocTimeseriesV2 is a TimeSeries which preallocs slices on Unmarshal. +type PreallocTimeseriesV2 struct { + *TimeSeries +} + +// Unmarshal implements proto.Message. +func (p *PreallocTimeseriesV2) Unmarshal(dAtA []byte) error { + p.TimeSeries = TimeseriesV2FromPool() + return p.TimeSeries.Unmarshal(dAtA) +} + +func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { + if req.data != nil { + bytePool.ReuseSlice(req.data) + req.data = nil + } + req.Source = 0 + req.Symbols = nil + req.Timeseries = nil + writeRequestPool.Put(req) +} + +func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 { + return writeRequestPool.Get().(*PreallocWriteRequestV2) +} + +// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool. +// ReuseSlice should be called once done. +func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 { + return slicePool.Get().([]PreallocTimeseriesV2) +} + +// ReuseSlice puts the slice back into a sync.Pool for reuse. +func ReuseSlice(ts []PreallocTimeseriesV2) { + for i := range ts { + ReuseTimeseries(ts[i].TimeSeries) + } + + slicePool.Put(ts[:0]) //nolint:staticcheck //see comment on slicePool for more details +} + +// TimeseriesV2FromPool retrieves a pointer to a TimeSeries from a sync.Pool. +// ReuseTimeseries should be called once done, unless ReuseSlice was called on the slice that contains this TimeSeries. +func TimeseriesV2FromPool() *TimeSeries { + return timeSeriesPool.Get().(*TimeSeries) +} + +// ReuseTimeseries puts the timeseries back into a sync.Pool for reuse. +func ReuseTimeseries(ts *TimeSeries) { + // clear ts lableRef and samples + ts.LabelsRefs = ts.LabelsRefs[:0] + ts.Samples = ts.Samples[:0] + + // clear exmplar labelrefs + for i := range ts.Exemplars { + ts.Exemplars[i].LabelsRefs = ts.Exemplars[i].LabelsRefs[:0] + } + + for i := range ts.Histograms { + ts.Histograms[i].Reset() + } + + ts.Exemplars = ts.Exemplars[:0] + ts.Histograms = ts.Histograms[:0] + ts.Metadata = Metadata{} + timeSeriesPool.Put(ts) +} diff --git a/pkg/cortexpbv2/timeseriesv2_test.go b/pkg/cortexpbv2/timeseriesv2_test.go new file mode 100644 index 00000000000..d10527564e8 --- /dev/null +++ b/pkg/cortexpbv2/timeseriesv2_test.go @@ -0,0 +1,111 @@ +package cortexpbv2 + +import ( + "fmt" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPreallocTimeseriesV2SliceFromPool(t *testing.T) { + t.Run("new instance is provided when not available to reuse", func(t *testing.T) { + first := PreallocTimeseriesV2SliceFromPool() + second := PreallocTimeseriesV2SliceFromPool() + + assert.NotSame(t, first, second) + }) + + t.Run("instance is cleaned before reusing", func(t *testing.T) { + slice := PreallocTimeseriesV2SliceFromPool() + slice = append(slice, PreallocTimeseriesV2{TimeSeries: &TimeSeries{}}) + ReuseSlice(slice) + + reused := PreallocTimeseriesV2SliceFromPool() + assert.Len(t, reused, 0) + }) +} + +func TestTimeseriesV2FromPool(t *testing.T) { + t.Run("new instance is provided when not available to reuse", func(t *testing.T) { + first := TimeseriesV2FromPool() + second := TimeseriesV2FromPool() + + assert.NotSame(t, first, second) + }) + + t.Run("instance is cleaned before reusing", func(t *testing.T) { + ts := TimeseriesV2FromPool() + ts.LabelsRefs = []uint32{1, 2} + ts.Samples = []Sample{{Value: 1, Timestamp: 2}} + ts.Exemplars = []Exemplar{{LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 2}} + ts.Histograms = []Histogram{{}} + fmt.Println("ts.Histograms", len(ts.Histograms)) + ReuseTimeseries(ts) + + reused := TimeseriesV2FromPool() + assert.Len(t, reused.LabelsRefs, 0) + assert.Len(t, reused.Samples, 0) + assert.Len(t, reused.Exemplars, 0) + assert.Len(t, reused.Histograms, 0) + }) +} + +func BenchmarkMarshallWriteRequest(b *testing.B) { + ts := PreallocTimeseriesV2SliceFromPool() + + for i := 0; i < 100; i++ { + ts = append(ts, PreallocTimeseriesV2{TimeSeries: TimeseriesV2FromPool()}) + ts[i].LabelsRefs = []uint32{1, 2, 3, 4, 5, 6, 7, 8} + ts[i].Samples = []Sample{{Value: 1, Timestamp: 2}} + } + + tests := []struct { + name string + writeRequestFactory func() proto.Marshaler + clean func(in interface{}) + }{ + { + name: "no-pool", + writeRequestFactory: func() proto.Marshaler { + return &WriteRequest{Timeseries: ts} + }, + clean: func(in interface{}) {}, + }, + { + name: "byte pool", + writeRequestFactory: func() proto.Marshaler { + w := &PreallocWriteRequestV2{} + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequestV2(in.(*PreallocWriteRequestV2)) + }, + }, + { + name: "byte and write pool", + writeRequestFactory: func() proto.Marshaler { + w := PreallocWriteRequestV2FromPool() + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequestV2(in.(*PreallocWriteRequestV2)) + }, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + w := tc.writeRequestFactory() + _, err := w.Marshal() + require.NoError(b, err) + tc.clean(w) + } + b.ReportAllocs() + }) + } +}