Skip to content

Commit

Permalink
feat(output): HTTP Gzip compression (#31)
Browse files Browse the repository at this point in the history
* feat(output): HTTP Gzip compression

* feat(output): CompressionStrategy
  • Loading branch information
samcm authored Nov 30, 2022
1 parent 4cf52ab commit bf9ecd6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
8 changes: 8 additions & 0 deletions pkg/output/http/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package http

type CompressionStrategy string

var (
CompressionStrategyNone CompressionStrategy = "none"
CompressionStrategyGzip CompressionStrategy = "gzip"
)
13 changes: 7 additions & 6 deletions pkg/output/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
)

type Config struct {
Address string `yaml:"address"`
Headers map[string]string `yaml:"headers"`
MaxQueueSize int `yaml:"max_queue_size" default:"51200"`
BatchTimeout time.Duration `yaml:"batch_timeout" default:"5s"`
ExportTimeout time.Duration `yaml:"export_timeout" default:"30s"`
MaxExportBatchSize int `yaml:"max_export_batch_size" default:"512"`
Address string `yaml:"address"`
Headers map[string]string `yaml:"headers"`
MaxQueueSize int `yaml:"max_queue_size" default:"51200"`
BatchTimeout time.Duration `yaml:"batch_timeout" default:"5s"`
ExportTimeout time.Duration `yaml:"export_timeout" default:"30s"`
MaxExportBatchSize int `yaml:"max_export_batch_size" default:"512"`
Compression CompressionStrategy `yaml:"compression" default:"none"`
}

func (c *Config) Validate() error {
Expand Down
33 changes: 32 additions & 1 deletion pkg/output/http/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -60,8 +61,18 @@ func (e *EventExporter) sendUpstream(ctx context.Context, events []*xatu.Decorat
body += string(eventAsJSON) + "\n"
}

buf := bytes.NewBufferString(body)
if e.config.Compression == CompressionStrategyGzip {
compressed, err := e.gzip(buf)
if err != nil {
return err
}

buf = compressed
}

// TODO: check that this also handles processor timeout
req, err := http.NewRequestWithContext(ctx, httpMethod, e.config.Address, bytes.NewBufferString(body))
req, err := http.NewRequestWithContext(ctx, httpMethod, e.config.Address, buf)
if err != nil {
return err
}
Expand All @@ -72,6 +83,10 @@ func (e *EventExporter) sendUpstream(ctx context.Context, events []*xatu.Decorat

req.Header.Set("Content-Type", "application/x-ndjson")

if e.config.Compression == CompressionStrategyGzip {
req.Header.Set("Content-Encoding", "gzip")
}

rsp, err = e.client.Do(req)
if err != nil {
return err
Expand All @@ -90,3 +105,19 @@ func (e *EventExporter) sendUpstream(ctx context.Context, events []*xatu.Decorat

return nil
}

func (e *EventExporter) gzip(in *bytes.Buffer) (*bytes.Buffer, error) {
out := &bytes.Buffer{}
g := gzip.NewWriter(out)

_, err := g.Write(in.Bytes())
if err != nil {
return out, err
}

if err := g.Close(); err != nil {
return out, err
}

return out, nil
}

0 comments on commit bf9ecd6

Please sign in to comment.