Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote write v2 by converting request #6330

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ jobs:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
steps:
- name: Upgrade golang
uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ run:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [CHANGE] Update the `cortex_ingester_inflight_push_requests` metric to represent the maximum number of inflight requests recorded in the last minute. #6437
* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345
* [FEATURE] Support Prometheus remote write 2.0. #6330
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,11 @@ ha_tracker:
# CLI flag: -distributor.sign-write-requests
[sign_write_requests: <boolean> | default = false]

# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
# request.
# CLI flag: -distributor.remote-write2-enabled
[remote_write2_enabled: <boolean> | default = false]

ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Currently experimental features are:
- Distributor:
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
- Accept Prometheus remote write 2.0 request (`-distributor.remote-write2-enabled=true`)
- Tenant Deletion in Purger, for blocks storage.
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
- Blocks storage bucket index
Expand Down
76 changes: 76 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
Expand Down Expand Up @@ -334,3 +335,78 @@ func CreateBlock(

return id, nil
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

st := writev2.NewSymbolTable()

lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
for _, lbl := range additionalLabels {
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
}

var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph writev2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
ph = writev2.FromFloatHistogram(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int(i))
ph = writev2.FromIntHistogram(tsMillis, h)
}

// Generate the series
series = append(series, writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Histograms: []writev2.Histogram{ph},
})

symbols = st.Symbols()

return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

st := writev2.NewSymbolTable()
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}

for _, label := range additionalLabels {
lbs = append(lbs, labels.Label{
Name: label.Name,
Value: label.Value,
})
}
series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
},
})
symbols = st.Symbols()

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

vector = append(vector, &model.Sample{
Metric: metric,
Value: model.SampleValue(value),
Timestamp: model.Time(tsMillis),
})

return
}
40 changes: 40 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"
Expand Down Expand Up @@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
var metricName string
attributes := make(map[string]any)
Expand Down Expand Up @@ -356,6 +390,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

// Metadata runs a metadata query
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
return metadata, err
}

// QueryExemplars runs an exemplars query
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down
Loading
Loading