diff --git a/CHANGELOG.md b/CHANGELOG.md index 92e9183a2d..eae6751448 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ Main (unreleased) - Fixes godeltaprof hiding (renaming `godeltaprof_*` profile names to regular ones). (@korniltsev) +- Change profile handling in `pyroscope.receive_http` and `pyroscope.write` components to use in-memory processing instead of pipes. (@marcsanmi) v1.6.1 ----------------- diff --git a/internal/component/pyroscope/appender.go b/internal/component/pyroscope/appender.go index 28eb62795f..e0c27173f7 100644 --- a/internal/component/pyroscope/appender.go +++ b/internal/component/pyroscope/appender.go @@ -2,7 +2,6 @@ package pyroscope import ( "context" - "io" "net/http" "net/url" "sync" @@ -34,7 +33,7 @@ type RawSample struct { } type IncomingProfile struct { - Body io.ReadCloser + RawBody []byte Headers http.Header URL *url.URL Labels labels.Labels @@ -131,7 +130,15 @@ func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) e }() var multiErr error for _, x := range a.children { - err := x.AppendIngest(ctx, profile) + // Create a copy for each child + profileCopy := &IncomingProfile{ + RawBody: profile.RawBody, // []byte is immutable, safe to share + Headers: profile.Headers.Clone(), + URL: profile.URL, // URL is immutable once created + Labels: profile.Labels.Copy(), + } + + err := x.AppendIngest(ctx, profileCopy) if err != nil { multiErr = multierror.Append(multiErr, err) } @@ -153,3 +160,19 @@ func (f AppendableFunc) AppendIngest(_ context.Context, _ *IncomingProfile) erro // This is a no-op implementation return nil } + +// For testing AppendIngest operations +type AppendableIngestFunc func(ctx context.Context, profile *IncomingProfile) error + +func (f AppendableIngestFunc) Appender() Appender { + return f +} + +func (f AppendableIngestFunc) AppendIngest(ctx context.Context, p *IncomingProfile) error { + return f(ctx, p) +} + +func (f AppendableIngestFunc) Append(_ context.Context, _ labels.Labels, _ []*RawSample) error { + // This is a no-op implementation + return nil +} diff --git a/internal/component/pyroscope/appender_test.go b/internal/component/pyroscope/appender_test.go index 390014bb44..646ef8e33d 100644 --- a/internal/component/pyroscope/appender_test.go +++ b/internal/component/pyroscope/appender_test.go @@ -51,3 +51,52 @@ func Test_FanOut(t *testing.T) { require.Error(t, f.Appender().Append(context.Background(), lbls, []*RawSample{})) require.Equal(t, int32(2), totalAppend.Load()) } + +func Test_FanOut_AppendIngest(t *testing.T) { + totalAppend := atomic.NewInt32(0) + profile := &IncomingProfile{ + RawBody: []byte("test"), + Labels: labels.Labels{{Name: "foo", Value: "bar"}}, + } + + f := NewFanout([]Appendable{ + AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error { + require.Equal(t, profile.RawBody, p.RawBody) + require.Equal(t, profile.Labels, p.Labels) + totalAppend.Inc() + return nil + }), + AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error { + require.Equal(t, profile.RawBody, p.RawBody) + require.Equal(t, profile.Labels, p.Labels) + totalAppend.Inc() + return nil + }), + AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error { + require.Equal(t, profile.RawBody, p.RawBody) + require.Equal(t, profile.Labels, p.Labels) + totalAppend.Inc() + return errors.New("foo") + }), + }, "foo", prometheus.NewRegistry()) + totalAppend.Store(0) + require.Error(t, f.Appender().AppendIngest(context.Background(), profile)) + require.Equal(t, int32(3), totalAppend.Load()) + f.UpdateChildren([]Appendable{ + AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error { + require.Equal(t, profile.RawBody, p.RawBody) + require.Equal(t, profile.Labels, p.Labels) + totalAppend.Inc() + return nil + }), + AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error { + require.Equal(t, profile.RawBody, p.RawBody) + require.Equal(t, profile.Labels, p.Labels) + totalAppend.Inc() + return errors.New("bar") + }), + }) + totalAppend.Store(0) + require.Error(t, f.Appender().AppendIngest(context.Background(), profile)) + require.Equal(t, int32(2), totalAppend.Load()) +} diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 8f17b54a85..8951bb85d7 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -1,6 +1,7 @@ package receive_http import ( + "bytes" "context" "errors" "fmt" @@ -239,47 +240,33 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { } } - // Create a pipe for each appendable - pipeWriters := make([]io.Writer, len(appendables)) - pipeReaders := make([]io.Reader, len(appendables)) - for i := range appendables { - pr, pw := io.Pipe() - pipeReaders[i] = pr - pipeWriters[i] = pw + // Read the entire body into memory + // This matches how Append() handles profile data (as RawProfile), + // but means the entire profile will be held in memory + var buf bytes.Buffer + if _, err := io.Copy(&buf, r.Body); err != nil { + level.Error(c.opts.Logger).Log("msg", "Failed to read request body", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return } - mw := io.MultiWriter(pipeWriters...) - // Create an errgroup with the timeout context g, ctx := errgroup.WithContext(r.Context()) - // Start copying the request body to all pipes - g.Go(func() error { - defer func() { - for _, pw := range pipeWriters { - pw.(io.WriteCloser).Close() - } - }() - _, err := io.Copy(mw, r.Body) - return err - }) - - // Process each appendable + // Process each appendable with a new reader from the buffer for i, appendable := range appendables { g.Go(func() error { - defer pipeReaders[i].(io.ReadCloser).Close() - profile := &pyroscope.IncomingProfile{ - Body: io.NopCloser(pipeReaders[i]), + RawBody: buf.Bytes(), Headers: r.Header.Clone(), URL: r.URL, Labels: lbls, } - err := appendable.Appender().AppendIngest(ctx, profile) - if err != nil { + if err := appendable.Appender().AppendIngest(ctx, profile); err != nil { level.Error(c.opts.Logger).Log("msg", "Failed to append profile", "appendable", i, "err", err) return err } + level.Debug(c.opts.Logger).Log("msg", "Profile appended successfully", "appendable", i) return nil }) diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 46a3098124..f1711c89f1 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -6,7 +6,6 @@ import ( "crypto/rand" "errors" "fmt" - "io" "net/http" "net/url" "testing" @@ -350,9 +349,7 @@ func verifyForwardedProfiles( if testApp.lastProfile != nil { // Verify profile body - body, err := io.ReadAll(testApp.lastProfile.Body) - require.NoError(t, err, "Failed to read profile body for appendable %d", i) - require.Equal(t, expectedProfile, body, "Profile mismatch for appendable %d", i) + require.Equal(t, expectedProfile, testApp.lastProfile.RawBody, "Profile mismatch for appendable %d", i) // Verify headers for key, value := range expectedHeaders { @@ -486,22 +483,13 @@ func (a *testAppender) Append(_ context.Context, lbls labels.Labels, samples []* } func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error { - var buf bytes.Buffer - tee := io.TeeReader(profile.Body, &buf) - newProfile := &pyroscope.IncomingProfile{ - Body: io.NopCloser(&buf), + RawBody: profile.RawBody, Headers: profile.Headers, URL: profile.URL, Labels: profile.Labels, } a.lastProfile = newProfile - - _, err := io.Copy(io.Discard, tee) - if err != nil { - return err - } - return a.appendErr } diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 541b61d169..5079a18f3e 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -1,6 +1,7 @@ package write import ( + "bytes" "context" "errors" "fmt" @@ -358,33 +359,11 @@ func (e *PyroscopeWriteError) Error() string { // AppendIngest implements the pyroscope.Appender interface. func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { - pipeWriters := make([]io.Writer, len(f.config.Endpoints)) - pipeReaders := make([]io.Reader, len(f.config.Endpoints)) - for i := range f.config.Endpoints { - pr, pw := io.Pipe() - pipeReaders[i] = pr - pipeWriters[i] = pw - } - mw := io.MultiWriter(pipeWriters...) - g, ctx := errgroup.WithContext(ctx) - // Start copying the profile body to all pipes - g.Go(func() error { - defer func() { - for _, pw := range pipeWriters { - pw.(io.WriteCloser).Close() - } - }() - _, err := io.Copy(mw, profile.Body) - return err - }) - // Send to each endpoint concurrently - for i, endpoint := range f.config.Endpoints { + for _, endpoint := range f.config.Endpoints { g.Go(func() error { - defer pipeReaders[i].(io.ReadCloser).Close() - u, err := url.Parse(endpoint.URL) if err != nil { return fmt.Errorf("parse endpoint URL: %w", err) @@ -409,7 +388,7 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco } u.RawQuery = query.Encode() - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), pipeReaders[i]) + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(profile.RawBody)) if err != nil { return fmt.Errorf("create request: %w", err) } diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go index 17c8d30351..dcd3d67663 100644 --- a/internal/component/pyroscope/write/write_test.go +++ b/internal/component/pyroscope/write/write_test.go @@ -1,7 +1,6 @@ package write import ( - "bytes" "context" "errors" "io" @@ -344,7 +343,7 @@ func Test_Write_AppendIngest(t *testing.T) { require.NotNil(t, export.Receiver, "Receiver is nil") incomingProfile := &pyroscope.IncomingProfile{ - Body: io.NopCloser(bytes.NewReader(testData)), + RawBody: testData, Headers: http.Header{ "X-Test-Header": []string{"profile-value"}, // This should be overridden by endpoint "X-Profile-Header": []string{"profile-value1", "profile-value2"}, // This should be preserved