From 43e2818d3043a6d9ae3d8d6c5b98080fa994ff17 Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Thu, 30 Jan 2025 14:12:15 +0100 Subject: [PATCH] refactor(pyroscope): switch to in-memory profile handling for fanout AppendIngest --- CHANGELOG.md | 2 + .../pyroscope/receive_http/receive_http.go | 39 +++++++------------ internal/component/pyroscope/write/write.go | 32 +++++---------- 3 files changed, 25 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea2b042112..4a5cd74906 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,8 @@ Main (unreleased) - Add support for pushv1.PusherService Connect API in `pyroscope.receive_http`. (@simonswine) +- Change profile handling in `pyroscope.receive_http` and `pyroscope.write` components to use in-memory processing instead of pipes. (@marcsanmi) + v1.6.1 ----------------- diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 8f17b54a85..1ef89c009e 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -1,6 +1,7 @@ package receive_http import ( + "bytes" "context" "errors" "fmt" @@ -239,47 +240,33 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { } } - // Create a pipe for each appendable - pipeWriters := make([]io.Writer, len(appendables)) - pipeReaders := make([]io.Reader, len(appendables)) - for i := range appendables { - pr, pw := io.Pipe() - pipeReaders[i] = pr - pipeWriters[i] = pw + // Read the entire body into memory + // This matches how Append() handles profile data (as RawProfile), + // but means the entire profile will be held in memory + var buf bytes.Buffer + if _, err := io.Copy(&buf, r.Body); err != nil { + level.Error(c.opts.Logger).Log("msg", "Failed to read request body", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return } - mw := io.MultiWriter(pipeWriters...) - // Create an errgroup with the timeout context g, ctx := errgroup.WithContext(r.Context()) - // Start copying the request body to all pipes - g.Go(func() error { - defer func() { - for _, pw := range pipeWriters { - pw.(io.WriteCloser).Close() - } - }() - _, err := io.Copy(mw, r.Body) - return err - }) - - // Process each appendable + // Process each appendable with a new reader from the buffer for i, appendable := range appendables { g.Go(func() error { - defer pipeReaders[i].(io.ReadCloser).Close() - profile := &pyroscope.IncomingProfile{ - Body: io.NopCloser(pipeReaders[i]), + Body: io.NopCloser(bytes.NewReader(buf.Bytes())), Headers: r.Header.Clone(), URL: r.URL, Labels: lbls, } - err := appendable.Appender().AppendIngest(ctx, profile) - if err != nil { + if err := appendable.Appender().AppendIngest(ctx, profile); err != nil { level.Error(c.opts.Logger).Log("msg", "Failed to append profile", "appendable", i, "err", err) return err } + level.Debug(c.opts.Logger).Log("msg", "Profile appended successfully", "appendable", i) return nil }) diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 541b61d169..c089361d7d 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -1,6 +1,7 @@ package write import ( + "bytes" "context" "errors" "fmt" @@ -358,33 +359,20 @@ func (e *PyroscopeWriteError) Error() string { // AppendIngest implements the pyroscope.Appender interface. func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { - pipeWriters := make([]io.Writer, len(f.config.Endpoints)) - pipeReaders := make([]io.Reader, len(f.config.Endpoints)) - for i := range f.config.Endpoints { - pr, pw := io.Pipe() - pipeReaders[i] = pr - pipeWriters[i] = pw + // Read the entire body into memory + // This matches how Append() handles profile data (as RawProfile), + // but means the entire profile will be held in memory + var buf bytes.Buffer + if _, err := io.Copy(&buf, profile.Body); err != nil { + return fmt.Errorf("reading profile body: %w", err) } - mw := io.MultiWriter(pipeWriters...) + bodyBytes := buf.Bytes() g, ctx := errgroup.WithContext(ctx) - // Start copying the profile body to all pipes - g.Go(func() error { - defer func() { - for _, pw := range pipeWriters { - pw.(io.WriteCloser).Close() - } - }() - _, err := io.Copy(mw, profile.Body) - return err - }) - // Send to each endpoint concurrently - for i, endpoint := range f.config.Endpoints { + for _, endpoint := range f.config.Endpoints { g.Go(func() error { - defer pipeReaders[i].(io.ReadCloser).Close() - u, err := url.Parse(endpoint.URL) if err != nil { return fmt.Errorf("parse endpoint URL: %w", err) @@ -409,7 +397,7 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco } u.RawQuery = query.Encode() - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), pipeReaders[i]) + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(bodyBytes)) if err != nil { return fmt.Errorf("create request: %w", err) }