Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pyroscope): switch to in-memory profile handling for AppendIngest #2577

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Main (unreleased)

- Fixes godeltaprof hiding (renaming `godeltaprof_*` profile names to regular ones). (@korniltsev)

- Change profile handling in `pyroscope.receive_http` and `pyroscope.write` components to use in-memory processing instead of pipes. (@marcsanmi)

v1.6.1
-----------------
Expand Down
29 changes: 26 additions & 3 deletions internal/component/pyroscope/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pyroscope

import (
"context"
"io"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -34,7 +33,7 @@ type RawSample struct {
}

type IncomingProfile struct {
Body io.ReadCloser
RawBody []byte
Headers http.Header
URL *url.URL
Labels labels.Labels
Expand Down Expand Up @@ -131,7 +130,15 @@ func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) e
}()
var multiErr error
for _, x := range a.children {
err := x.AppendIngest(ctx, profile)
// Create a copy for each child
profileCopy := &IncomingProfile{
RawBody: profile.RawBody, // []byte is immutable, safe to share
Headers: profile.Headers.Clone(),
URL: profile.URL, // URL is immutable once created
Labels: profile.Labels.Copy(),
}

err := x.AppendIngest(ctx, profileCopy)
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
Expand All @@ -153,3 +160,19 @@ func (f AppendableFunc) AppendIngest(_ context.Context, _ *IncomingProfile) erro
// This is a no-op implementation
return nil
}

// For testing AppendIngest operations
type AppendableIngestFunc func(ctx context.Context, profile *IncomingProfile) error

func (f AppendableIngestFunc) Appender() Appender {
return f
}

func (f AppendableIngestFunc) AppendIngest(ctx context.Context, p *IncomingProfile) error {
return f(ctx, p)
}

func (f AppendableIngestFunc) Append(_ context.Context, _ labels.Labels, _ []*RawSample) error {
// This is a no-op implementation
return nil
}
49 changes: 49 additions & 0 deletions internal/component/pyroscope/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,52 @@ func Test_FanOut(t *testing.T) {
require.Error(t, f.Appender().Append(context.Background(), lbls, []*RawSample{}))
require.Equal(t, int32(2), totalAppend.Load())
}

func Test_FanOut_AppendIngest(t *testing.T) {
totalAppend := atomic.NewInt32(0)
profile := &IncomingProfile{
RawBody: []byte("test"),
Labels: labels.Labels{{Name: "foo", Value: "bar"}},
}

f := NewFanout([]Appendable{
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return nil
}),
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return nil
}),
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return errors.New("foo")
}),
}, "foo", prometheus.NewRegistry())
totalAppend.Store(0)
require.Error(t, f.Appender().AppendIngest(context.Background(), profile))
require.Equal(t, int32(3), totalAppend.Load())
f.UpdateChildren([]Appendable{
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return nil
}),
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return errors.New("bar")
}),
})
totalAppend.Store(0)
require.Error(t, f.Appender().AppendIngest(context.Background(), profile))
require.Equal(t, int32(2), totalAppend.Load())
}
39 changes: 13 additions & 26 deletions internal/component/pyroscope/receive_http/receive_http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package receive_http

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -239,47 +240,33 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
}
}

// Create a pipe for each appendable
pipeWriters := make([]io.Writer, len(appendables))
pipeReaders := make([]io.Reader, len(appendables))
for i := range appendables {
pr, pw := io.Pipe()
pipeReaders[i] = pr
pipeWriters[i] = pw
// Read the entire body into memory
// This matches how Append() handles profile data (as RawProfile),
// but means the entire profile will be held in memory
var buf bytes.Buffer
if _, err := io.Copy(&buf, r.Body); err != nil {
level.Error(c.opts.Logger).Log("msg", "Failed to read request body", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
mw := io.MultiWriter(pipeWriters...)

// Create an errgroup with the timeout context
g, ctx := errgroup.WithContext(r.Context())

// Start copying the request body to all pipes
g.Go(func() error {
defer func() {
for _, pw := range pipeWriters {
pw.(io.WriteCloser).Close()
}
}()
_, err := io.Copy(mw, r.Body)
return err
})

// Process each appendable
// Process each appendable with a new reader from the buffer
for i, appendable := range appendables {
g.Go(func() error {
defer pipeReaders[i].(io.ReadCloser).Close()

profile := &pyroscope.IncomingProfile{
Body: io.NopCloser(pipeReaders[i]),
RawBody: buf.Bytes(),
Headers: r.Header.Clone(),
URL: r.URL,
Labels: lbls,
}

err := appendable.Appender().AppendIngest(ctx, profile)
if err != nil {
if err := appendable.Appender().AppendIngest(ctx, profile); err != nil {
level.Error(c.opts.Logger).Log("msg", "Failed to append profile", "appendable", i, "err", err)
return err
}

level.Debug(c.opts.Logger).Log("msg", "Profile appended successfully", "appendable", i)
return nil
})
Expand Down
16 changes: 2 additions & 14 deletions internal/component/pyroscope/receive_http/receive_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/rand"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"testing"
Expand Down Expand Up @@ -350,9 +349,7 @@ func verifyForwardedProfiles(

if testApp.lastProfile != nil {
// Verify profile body
body, err := io.ReadAll(testApp.lastProfile.Body)
require.NoError(t, err, "Failed to read profile body for appendable %d", i)
require.Equal(t, expectedProfile, body, "Profile mismatch for appendable %d", i)
require.Equal(t, expectedProfile, testApp.lastProfile.RawBody, "Profile mismatch for appendable %d", i)

// Verify headers
for key, value := range expectedHeaders {
Expand Down Expand Up @@ -486,22 +483,13 @@ func (a *testAppender) Append(_ context.Context, lbls labels.Labels, samples []*
}

func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error {
var buf bytes.Buffer
tee := io.TeeReader(profile.Body, &buf)

newProfile := &pyroscope.IncomingProfile{
Body: io.NopCloser(&buf),
RawBody: profile.RawBody,
Headers: profile.Headers,
URL: profile.URL,
Labels: profile.Labels,
}
a.lastProfile = newProfile

_, err := io.Copy(io.Discard, tee)
if err != nil {
return err
}

return a.appendErr
}

Expand Down
27 changes: 3 additions & 24 deletions internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package write

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -358,33 +359,11 @@ func (e *PyroscopeWriteError) Error() string {

// AppendIngest implements the pyroscope.Appender interface.
func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error {
pipeWriters := make([]io.Writer, len(f.config.Endpoints))
pipeReaders := make([]io.Reader, len(f.config.Endpoints))
for i := range f.config.Endpoints {
pr, pw := io.Pipe()
pipeReaders[i] = pr
pipeWriters[i] = pw
}
mw := io.MultiWriter(pipeWriters...)

g, ctx := errgroup.WithContext(ctx)

// Start copying the profile body to all pipes
g.Go(func() error {
defer func() {
for _, pw := range pipeWriters {
pw.(io.WriteCloser).Close()
}
}()
_, err := io.Copy(mw, profile.Body)
return err
})

// Send to each endpoint concurrently
for i, endpoint := range f.config.Endpoints {
for _, endpoint := range f.config.Endpoints {
g.Go(func() error {
defer pipeReaders[i].(io.ReadCloser).Close()

u, err := url.Parse(endpoint.URL)
if err != nil {
return fmt.Errorf("parse endpoint URL: %w", err)
Expand All @@ -409,7 +388,7 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
}
u.RawQuery = query.Encode()

req, err := http.NewRequestWithContext(ctx, "POST", u.String(), pipeReaders[i])
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(profile.RawBody))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions internal/component/pyroscope/write/write_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package write

import (
"bytes"
"context"
"errors"
"io"
Expand Down Expand Up @@ -344,7 +343,7 @@ func Test_Write_AppendIngest(t *testing.T) {
require.NotNil(t, export.Receiver, "Receiver is nil")

incomingProfile := &pyroscope.IncomingProfile{
Body: io.NopCloser(bytes.NewReader(testData)),
RawBody: testData,
Headers: http.Header{
"X-Test-Header": []string{"profile-value"}, // This should be overridden by endpoint
"X-Profile-Header": []string{"profile-value1", "profile-value2"}, // This should be preserved
Expand Down
Loading