Skip to content

Commit

Permalink
remove sync.Pool as requests are unbounded, update Prometheus deps
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Jan 20, 2021
1 parent 2ecfde5 commit 5331145
Show file tree
Hide file tree
Showing 8 changed files with 736 additions and 112 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cortex-tenant
sink
generate
config.yml
.out
*.rpm
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ Prometheus remote write proxy which marks timeseries with a Cortex tenant ID bas

Cortex tenants (separate namespaces where metrics are stored to and queried from) are identified by a `X-Scope-OrgID` HTTP header on both writes and queries.

This makes it impossible to use a single Prometheus (or an HA pair) to write to multiple tenants.
Problem is that Prometheus can't be configured to send this header. And even if it was possible to set it in the remote write configuration - it would be the same for all jobs. This makes it impossible to use a single Prometheus (or an HA pair) to write to multiple tenants.

This proxy solves the problem using the following logic:

- Receive Prometheus remote write
- Search each timeseries for a specific label name and extract a tenant ID from its value.
If the label wasn't not found then it can fall back to a configurable default ID.
If none is configured then the write request will be rejected.
- Optionally removes this label the timeseries
- Optionally removes this label from the timeseries
- Groups timeseries by tenant
- Issues a number of parallel per-tenant HTTP requests to Cortex adding the tenant HTTP header (`X-Scope-OrgID` by default)
- Issues a number of parallel per-tenant HTTP requests to Cortex with the relevant tenant HTTP header (`X-Scope-OrgID` by default)

## Usage

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.2
1.3.3
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ require (
github.com/klauspost/compress v1.11.7 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.15.0
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/prometheus/prometheus v1.8.2-0.20210120113717-82330b96ee74
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
github.com/valyala/fasthttp v1.19.0
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78 // indirect
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
golang.org/x/text v0.3.5 // indirect
google.golang.org/genproto v0.0.0-20210114201628-6edceaf6022f // indirect
google.golang.org/genproto v0.0.0-20210119180700-e258113e47cc // indirect
google.golang.org/grpc v1.35.0 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v2 v2.4.0
)
727 changes: 689 additions & 38 deletions go.sum

Large diffs are not rendered by default.

21 changes: 0 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"flag"
"os"
"os/signal"
"sync"
"syscall"

"net/http"
Expand All @@ -13,28 +12,8 @@ import (
log "github.com/sirupsen/logrus"
)

const (
bufSize = 1024 * 128
)

type buffer struct {
b []byte
}

func (b *buffer) reset() {
b.b = b.b[:bufSize]
}

var (
version = "0.0.0"

bufferPool = sync.Pool{
New: func() interface{} {
return &buffer{
b: make([]byte, 0, bufSize),
}
},
}
)

func main() {
Expand Down
37 changes: 16 additions & 21 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
}

if len(wrReqIn.Timeseries) == 0 {
// If there's metadata - just accept the request and drop it
if len(wrReqIn.Metadata) > 0 {
return
}

ctx.Error("No timeseries found in the request", fh.StatusBadRequest)
return
}
Expand Down Expand Up @@ -151,12 +156,12 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
return
}

func (p *processor) createWriteRequests(in *prompb.WriteRequest) (map[string]*prompb.WriteRequest, error) {
func (p *processor) createWriteRequests(wrReqIn *prompb.WriteRequest) (map[string]*prompb.WriteRequest, error) {
// Create per-tenant write requests
m := map[string]*prompb.WriteRequest{}

for _, ts := range in.Timeseries {
tenant, err := p.processTimeseries(ts)
for _, ts := range wrReqIn.Timeseries {
tenant, err := p.processTimeseries(&ts)
if err != nil {
return nil, err
}
Expand All @@ -174,11 +179,7 @@ func (p *processor) createWriteRequests(in *prompb.WriteRequest) (map[string]*pr
}

func (p *processor) unmarshal(b []byte) (*prompb.WriteRequest, error) {
buf := bufferPool.Get().(*buffer)
buf.reset()
defer bufferPool.Put(buf)

decoded, err := snappy.Decode(buf.b, b)
decoded, err := snappy.Decode(nil, b)
if err != nil {
return nil, errors.Wrap(err, "Unable to unpack Snappy")
}
Expand All @@ -191,19 +192,16 @@ func (p *processor) unmarshal(b []byte) (*prompb.WriteRequest, error) {
return req, nil
}

func (p *processor) marshal(wr *prompb.WriteRequest, bufDst []byte) (bufOut []byte, err error) {
buf := bufferPool.Get().(*buffer)
buf.reset()
defer bufferPool.Put(buf)
func (p *processor) marshal(wr *prompb.WriteRequest) (bufOut []byte, err error) {
b := make([]byte, wr.Size())

// Marshal to Protobuf
l, err := wr.MarshalTo(buf.b)
if err != nil {
if _, err = wr.MarshalTo(b); err != nil {
return
}

// Compress with Snappy
return snappy.Encode(bufDst, buf.b[:l]), nil
return snappy.Encode(nil, b), nil
}

func (p *processor) dispatch(clientIP net.Addr, reqID uuid.UUID, m map[string]*prompb.WriteRequest) (res []result) {
Expand Down Expand Up @@ -256,19 +254,16 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err
}

func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (code int, body []byte, err error) {
buf := bufferPool.Get().(*buffer)
buf.reset()

req := fh.AcquireRequest()
resp := fh.AcquireResponse()

defer func() {
fh.ReleaseRequest(req)
fh.ReleaseResponse(resp)
bufferPool.Put(buf)
}()

if buf.b, err = p.marshal(wr, buf.b); err != nil {
buf, err := p.marshal(wr)
if err != nil {
return
}

Expand All @@ -281,7 +276,7 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *
req.Header.Set(p.cfg.Tenant.Header, tenant)

req.SetRequestURI(p.cfg.Target)
req.SetBody(buf.b)
req.SetBody(buf)

if err = p.cli.DoTimeout(req, resp, p.cfg.Timeout); err != nil {
return
Expand Down
43 changes: 21 additions & 22 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ var (
Timestamp: 101112,
}

testTS1 = &prompb.TimeSeries{
Labels: []*prompb.Label{
&prompb.Label{
testTS1 = prompb.TimeSeries{
Labels: []prompb.Label{
prompb.Label{
Name: "__tenant__",
Value: "foobar",
},
Expand All @@ -55,9 +55,9 @@ var (
},
}

testTS2 = &prompb.TimeSeries{
Labels: []*prompb.Label{
&prompb.Label{
testTS2 = prompb.TimeSeries{
Labels: []prompb.Label{
prompb.Label{
Name: "__tenant__",
Value: "foobaz",
},
Expand All @@ -68,18 +68,18 @@ var (
},
}

testTS3 = &prompb.TimeSeries{
Labels: []*prompb.Label{
&prompb.Label{
testTS3 = prompb.TimeSeries{
Labels: []prompb.Label{
prompb.Label{
Name: "__tenantXXX",
Value: "foobaz",
},
},
}

testTS4 = &prompb.TimeSeries{
Labels: []*prompb.Label{
&prompb.Label{
testTS4 = prompb.TimeSeries{
Labels: []prompb.Label{
prompb.Label{
Name: "__tenant__",
Value: "foobaz",
},
Expand All @@ -91,20 +91,20 @@ var (
}

testWRQ = &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
Timeseries: []prompb.TimeSeries{
testTS1,
testTS2,
},
}

testWRQ1 = &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
Timeseries: []prompb.TimeSeries{
testTS1,
},
}

testWRQ2 = &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
Timeseries: []prompb.TimeSeries{
testTS2,
},
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func Test_handle(t *testing.T) {
err = p.run()
assert.Nil(t, err)

buf, err := p.marshal(testWRQ, nil)
buf, err := p.marshal(testWRQ)
assert.Nil(t, err)

s := &fh.Server{
Expand Down Expand Up @@ -243,19 +243,19 @@ func Test_processTimeseries(t *testing.T) {
p := newProcessor(*cfg)
assert.Nil(t, err)

ten, err := p.processTimeseries(testTS4)
ten, err := p.processTimeseries(&testTS4)
assert.Nil(t, err)
assert.Equal(t, "foobaz", ten)

ten, err = p.processTimeseries(testTS3)
ten, err = p.processTimeseries(&testTS3)
assert.Nil(t, err)
assert.Equal(t, "default", ten)

cfg.Tenant.Default = ""
p = newProcessor(*cfg)
assert.Nil(t, err)

ten, err = p.processTimeseries(testTS3)
ten, err = p.processTimeseries(&testTS3)
assert.NotNil(t, err)
}

Expand All @@ -270,7 +270,7 @@ func Test_marshal(t *testing.T) {
assert.NotNil(t, err)

buf := make([]byte, 1024)
buf, err = p.marshal(testWRQ, buf)
buf, err = p.marshal(testWRQ)
assert.Nil(t, err)

wrq, err := p.unmarshal(buf)
Expand Down Expand Up @@ -299,11 +299,10 @@ func Test_createWriteRequests(t *testing.T) {

func Benchmark_marshal(b *testing.B) {
p, _ := createProcessor()
buf := make([]byte, 1024)

b.ResetTimer()
for i := 0; i < b.N; i++ {
buf, _ = p.marshal(testWRQ, buf)
buf, _ := p.marshal(testWRQ)
_, _ = p.unmarshal(buf)
}
}

0 comments on commit 5331145

Please sign in to comment.