diff --git a/.github/workflows/test-build-deploy.yml b/.github/workflows/test-build-deploy.yml index 9e9c0499d6..1b00e76968 100644 --- a/.github/workflows/test-build-deploy.yml +++ b/.github/workflows/test-build-deploy.yml @@ -162,6 +162,7 @@ jobs: - integration_querier - integration_ruler - integration_query_fuzz + - integration_remote_write_v2 steps: - name: Upgrade golang uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0 diff --git a/.golangci.yml b/.golangci.yml index dbfe02e837..e5336badfd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -49,3 +49,4 @@ run: - integration_querier - integration_ruler - integration_query_fuzz + - integration_remote_write_v2 diff --git a/CHANGELOG.md b/CHANGELOG.md index e3c309dae9..11b27c2c5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 * [CHANGE] Update the `cortex_ingester_inflight_push_requests` metric to represent the maximum number of inflight requests recorded in the last minute. #6437 * [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345 +* [FEATURE] Support Prometheus remote write 2.0. #6330 * [FEATURE] Ruler: Pagination support for List Rules API. #6299 * [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 138705552e..087ee89d7c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2639,6 +2639,11 @@ ha_tracker: # CLI flag: -distributor.sign-write-requests [sign_write_requests: | default = false] +# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push +# request. +# CLI flag: -distributor.remote-write2-enabled +[remote_write2_enabled: | default = false] + ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 2b46407986..e6fde14a11 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -59,6 +59,7 @@ Currently experimental features are: - Distributor: - Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`) - Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`) + - Accept Prometheus remote write 2.0 request (`-distributor.remote-write2-enabled=true`) - Tenant Deletion in Purger, for blocks storage. - Query-frontend: query stats tracking (`-frontend.query-stats-enabled`) - Blocks storage bucket index diff --git a/integration/e2e/util.go b/integration/e2e/util.go index 141d043ab5..1177f8d33c 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/tsdbutil" @@ -334,3 +335,78 @@ func CreateBlock( return id, nil } + +func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) { + tsMillis := TimeToMilliseconds(ts) + + st := writev2.NewSymbolTable() + + lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}} + for _, lbl := range additionalLabels { + lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value}) + } + + var ( + h *histogram.Histogram + fh *histogram.FloatHistogram + ph writev2.Histogram + ) + if floatHistogram { + fh = tsdbutil.GenerateTestFloatHistogram(int(i)) + ph = writev2.FromFloatHistogram(tsMillis, fh) + } else { + h = tsdbutil.GenerateTestHistogram(int(i)) + ph = writev2.FromIntHistogram(tsMillis, h) + } + + // Generate the series + series = append(series, writev2.TimeSeries{ + LabelsRefs: st.SymbolizeLabels(lbs, nil), + Histograms: []writev2.Histogram{ph}, + }) + + symbols = st.Symbols() + + return +} + +func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) { + tsMillis := TimeToMilliseconds(ts) + value := rand.Float64() + + st := writev2.NewSymbolTable() + lbs := labels.Labels{{Name: labels.MetricName, Value: name}} + + for _, label := range additionalLabels { + lbs = append(lbs, labels.Label{ + Name: label.Name, + Value: label.Value, + }) + } + series = append(series, writev2.TimeSeries{ + // Generate the series + LabelsRefs: st.SymbolizeLabels(lbs, nil), + Samples: []writev2.Sample{ + {Value: value, Timestamp: tsMillis}, + }, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + }, + }) + symbols = st.Symbols() + + // Generate the expected vector when querying it + metric := model.Metric{} + metric[labels.MetricName] = model.LabelValue(name) + for _, lbl := range additionalLabels { + metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + + vector = append(vector, &model.Sample{ + Metric: metric, + Value: model.SampleValue(value), + Timestamp: model.Time(tsMillis), + }) + + return +} diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index bc53f4dc58..94787b1414 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" yaml "gopkg.in/yaml.v3" @@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) { return res, nil } +// PushV2 the input timeseries to the remote endpoint +func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) { + // Create write request + data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries}) + if err != nil { + return nil, err + } + + // Create HTTP request + compressed := snappy.Encode(nil, data) + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed)) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request") + req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0") + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + defer res.Body.Close() + return res, nil +} + func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) { var metricName string attributes := make(map[string]any) @@ -356,6 +390,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) { return value, err } +// Metadata runs a metadata query +func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) { + metadata, err := c.querierClient.Metadata(context.Background(), name, limit) + return metadata, err +} + // QueryExemplars runs an exemplars query func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go new file mode 100644 index 0000000000..8fba93d6ac --- /dev/null +++ b/integration/remote_write_v2_test.go @@ -0,0 +1,327 @@ +//go:build integration_remote_write_v2 +// +build integration_remote_write_v2 + +package integration + +import ( + "math/rand" + "net/http" + "path" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +func TestIngesterRollingUpdate(t *testing.T) { + // test prometheus remote write 2.0 push, when -distributor.remote-write2-enabled is true and ingester use 1.18.1 image. + // remote write 2.0 push success but, response header values are set to "0". + const blockRangePeriod = 5 * time.Second + ingesterImage := "quay.io/cortexproject/cortex:v1.18.1" + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + distributorFlag := mergeFlags(flags, map[string]string{ + "-distributor.remote-write2-enabled": "true", + }) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + // Start Cortex replicas. + // Start all other services. + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, ingesterImage) + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), distributorFlag, "") + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint()}), "") + + require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor, storeGateway)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + + // series push + symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) + res, err := c.PushV2(symbols1, series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + testPushHeader(t, res.Header, "0", "0", "0") + + // sample + result, err := c.Query("test_series", now) + require.NoError(t, err) + assert.Equal(t, expectedVector, result.(model.Vector)) + + // metadata + metadata, err := c.Metadata("test_series", "") + require.NoError(t, err) + require.Equal(t, 1, len(metadata["test_series"])) + + // histogram + histogramIdx := rand.Uint32() + symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + res, err = c.PushV2(symbols2, histogramSeries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + testPushHeader(t, res.Header, "0", "0", "0") + + symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + res, err = c.PushV2(symbols3, histogramFloatSeries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + testPushHeader(t, res.Header, "0", "0", "0") + + testHistogramTimestamp := now.Add(blockRangePeriod * 2) + expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx)) + result, err = c.Query(`test_histogram`, testHistogramTimestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + v := result.(model.Vector) + require.Equal(t, 2, v.Len()) + for _, s := range v { + require.NotNil(t, s.Histogram) + require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count)) + require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum)) + } +} + +func TestIngest(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + "-distributor.remote-write2-enabled": "true", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + // Start Cortex replicas. + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + + // series push + symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) + res, err := c.PushV2(symbols1, series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + testPushHeader(t, res.Header, "1", "0", "0") + + // sample + result, err := c.Query("test_series", now) + require.NoError(t, err) + assert.Equal(t, expectedVector, result.(model.Vector)) + + // metadata + metadata, err := c.Metadata("test_series", "") + require.NoError(t, err) + require.Equal(t, 1, len(metadata["test_series"])) + + // histogram + histogramIdx := rand.Uint32() + symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + res, err = c.PushV2(symbols2, histogramSeries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + testPushHeader(t, res.Header, "0", "1", "0") + + symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + res, err = c.PushV2(symbols3, histogramFloatSeries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + testPushHeader(t, res.Header, "0", "1", "0") + + testHistogramTimestamp := now.Add(blockRangePeriod * 2) + expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx)) + result, err = c.Query(`test_histogram`, testHistogramTimestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + v := result.(model.Vector) + require.Equal(t, 2, v.Len()) + for _, s := range v { + require.NotNil(t, s.Histogram) + require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count)) + require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum)) + } +} + +func TestExemplar(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.enable-native-histograms": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + "-ingester.max-exemplars": "100", + // Distributor. + "-distributor.replication-factor": "1", + "-distributor.remote-write2-enabled": "true", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + // Start Cortex replicas. + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + tsMillis := e2e.TimeToMilliseconds(now) + + symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"} + timeseries := []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type. + + HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. + UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. + }, + Samples: []writev2.Sample{{Value: 1, Timestamp: tsMillis}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}}, + }, + } + + res, err := c.PushV2(symbols, timeseries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + testPushHeader(t, res.Header, "1", "0", "1") + + start := time.Now().Add(-time.Minute) + end := now.Add(time.Minute) + + exemplars, err := c.QueryExemplars("test_metric", start, end) + require.NoError(t, err) + require.Equal(t, 1, len(exemplars)) +} + +func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) { + require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written")) + require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written")) + require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written")) +} diff --git a/pkg/api/api.go b/pkg/api/api.go index 13843c3e64..c8c9390f48 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -277,7 +277,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWrite2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") @@ -289,7 +289,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWrite2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -322,12 +322,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWrite2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWrite2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. } func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { diff --git a/pkg/cortexpb/cortex.pb.go b/pkg/cortexpb/cortex.pb.go index 3b63e15904..2b6c49330f 100644 --- a/pkg/cortexpb/cortex.pb.go +++ b/pkg/cortexpb/cortex.pb.go @@ -176,6 +176,12 @@ func (m *WriteRequest) GetSkipLabelNameValidation() bool { } type WriteResponse struct { + // Samples represents X-Prometheus-Remote-Write-Written-Samples + Samples int64 `protobuf:"varint,1,opt,name=Samples,proto3" json:"Samples,omitempty"` + // Histograms represents X-Prometheus-Remote-Write-Written-Histograms + Histograms int64 `protobuf:"varint,2,opt,name=Histograms,proto3" json:"Histograms,omitempty"` + // Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars + Exemplars int64 `protobuf:"varint,3,opt,name=Exemplars,proto3" json:"Exemplars,omitempty"` } func (m *WriteResponse) Reset() { *m = WriteResponse{} } @@ -210,6 +216,27 @@ func (m *WriteResponse) XXX_DiscardUnknown() { var xxx_messageInfo_WriteResponse proto.InternalMessageInfo +func (m *WriteResponse) GetSamples() int64 { + if m != nil { + return m.Samples + } + return 0 +} + +func (m *WriteResponse) GetHistograms() int64 { + if m != nil { + return m.Histograms + } + return 0 +} + +func (m *WriteResponse) GetExemplars() int64 { + if m != nil { + return m.Exemplars + } + return 0 +} + type TimeSeries struct { Labels []LabelAdapter `protobuf:"bytes,1,rep,name=labels,proto3,customtype=LabelAdapter" json:"labels"` // Sorted by time, oldest sample first. @@ -842,72 +869,73 @@ func init() { func init() { proto.RegisterFile("cortex.proto", fileDescriptor_893a47d0a749d749) } var fileDescriptor_893a47d0a749d749 = []byte{ - // 1031 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4b, 0x6f, 0x23, 0x45, - 0x17, 0xed, 0x72, 0xfb, 0x79, 0x63, 0x3b, 0x3d, 0xf5, 0x45, 0x1f, 0xad, 0x48, 0xd3, 0x71, 0x1a, - 0x01, 0x16, 0x42, 0x01, 0x05, 0x01, 0x9a, 0x51, 0x84, 0x64, 0x0f, 0xce, 0x43, 0x33, 0x76, 0xa2, - 0xb2, 0xc3, 0x68, 0xd8, 0x58, 0x15, 0xa7, 0x12, 0xb7, 0xa6, 0x5f, 0x74, 0x95, 0xa3, 0x09, 0x2b, - 0x56, 0x88, 0x25, 0x6b, 0xb6, 0x6c, 0xf8, 0x05, 0xfc, 0x86, 0x2c, 0xb3, 0x1c, 0xb1, 0x88, 0x88, - 0xb3, 0x99, 0xe5, 0x2c, 0xf8, 0x01, 0xa8, 0xaa, 0x5f, 0xce, 0x84, 0x11, 0x9b, 0xd9, 0x55, 0x9d, - 0x7b, 0xcf, 0xbd, 0xa7, 0xea, 0x9e, 0x2e, 0x35, 0xd4, 0x27, 0x41, 0x24, 0xd8, 0x8b, 0x8d, 0x30, - 0x0a, 0x44, 0x80, 0xab, 0xf1, 0x2e, 0x3c, 0x5a, 0x5d, 0x39, 0x0d, 0x4e, 0x03, 0x05, 0x7e, 0x2a, - 0x57, 0x71, 0xdc, 0xfe, 0xa3, 0x00, 0xf5, 0xa7, 0x91, 0x23, 0x18, 0x61, 0xdf, 0xcf, 0x18, 0x17, - 0xf8, 0x00, 0x40, 0x38, 0x1e, 0xe3, 0x2c, 0x72, 0x18, 0x37, 0x51, 0x4b, 0x6f, 0x2f, 0x6d, 0xae, - 0x6c, 0xa4, 0x55, 0x36, 0x46, 0x8e, 0xc7, 0x86, 0x2a, 0xd6, 0x5d, 0xbd, 0xb8, 0x5a, 0xd3, 0xfe, - 0xbc, 0x5a, 0xc3, 0x07, 0x11, 0xa3, 0xae, 0x1b, 0x4c, 0x46, 0x19, 0x8f, 0x2c, 0xd4, 0xc0, 0x0f, - 0xa0, 0x3c, 0x0c, 0x66, 0xd1, 0x84, 0x99, 0x85, 0x16, 0x6a, 0x37, 0x37, 0xd7, 0xf3, 0x6a, 0x8b, - 0x9d, 0x37, 0xe2, 0xa4, 0x9e, 0x3f, 0xf3, 0x48, 0x42, 0xc0, 0x0f, 0xa1, 0xea, 0x31, 0x41, 0x8f, - 0xa9, 0xa0, 0xa6, 0xae, 0xa4, 0x98, 0x39, 0xb9, 0xcf, 0x44, 0xe4, 0x4c, 0xfa, 0x49, 0xbc, 0x5b, - 0xbc, 0xb8, 0x5a, 0x43, 0x24, 0xcb, 0xc7, 0x5b, 0xb0, 0xca, 0x9f, 0x3b, 0xe1, 0xd8, 0xa5, 0x47, - 0xcc, 0x1d, 0xfb, 0xd4, 0x63, 0xe3, 0x33, 0xea, 0x3a, 0xc7, 0x54, 0x38, 0x81, 0x6f, 0xbe, 0xaa, - 0xb4, 0x50, 0xbb, 0x4a, 0xde, 0x93, 0x29, 0x4f, 0x64, 0xc6, 0x80, 0x7a, 0xec, 0xdb, 0x2c, 0x6e, - 0xaf, 0x01, 0xe4, 0x7a, 0x70, 0x05, 0xf4, 0xce, 0xc1, 0x9e, 0xa1, 0xe1, 0x2a, 0x14, 0xc9, 0xe1, - 0x93, 0x9e, 0x81, 0xec, 0x65, 0x68, 0x24, 0xea, 0x79, 0x18, 0xf8, 0x9c, 0xd9, 0x7f, 0x23, 0x80, - 0xfc, 0x76, 0x70, 0x07, 0xca, 0xaa, 0x73, 0x7a, 0x87, 0xff, 0xcb, 0x85, 0xab, 0x7e, 0x07, 0xd4, - 0x89, 0xba, 0x2b, 0xc9, 0x15, 0xd6, 0x15, 0xd4, 0x39, 0xa6, 0xa1, 0x60, 0x11, 0x49, 0x88, 0xf8, - 0x33, 0xa8, 0x70, 0xea, 0x85, 0x2e, 0xe3, 0x66, 0x41, 0xd5, 0x30, 0xf2, 0x1a, 0x43, 0x15, 0x50, - 0x87, 0xd6, 0x48, 0x9a, 0x86, 0xbf, 0x84, 0x1a, 0x7b, 0xc1, 0xbc, 0xd0, 0xa5, 0x11, 0x4f, 0x2e, - 0x0c, 0xe7, 0x9c, 0x5e, 0x12, 0x4a, 0x58, 0x79, 0x2a, 0x7e, 0x00, 0x30, 0x75, 0xb8, 0x08, 0x4e, - 0x23, 0xea, 0x71, 0xb3, 0xf8, 0xa6, 0xe0, 0xdd, 0x34, 0x96, 0x30, 0x17, 0x92, 0xed, 0x2f, 0xa0, - 0x96, 0x9d, 0x07, 0x63, 0x28, 0xca, 0x8b, 0x36, 0x51, 0x0b, 0xb5, 0xeb, 0x44, 0xad, 0xf1, 0x0a, - 0x94, 0xce, 0xa8, 0x3b, 0x8b, 0xa7, 0x5f, 0x27, 0xf1, 0xc6, 0xee, 0x40, 0x39, 0x3e, 0x42, 0x1e, - 0x97, 0x24, 0x94, 0xc4, 0xf1, 0x3a, 0xd4, 0x95, 0x85, 0x04, 0xf5, 0xc2, 0xb1, 0xc7, 0x15, 0x59, - 0x27, 0x4b, 0x19, 0xd6, 0xe7, 0xf6, 0xaf, 0x05, 0x68, 0xde, 0xf6, 0x00, 0xfe, 0x0a, 0x8a, 0xe2, - 0x3c, 0x8c, 0x4b, 0x35, 0x37, 0xdf, 0x7f, 0x9b, 0x57, 0x92, 0xed, 0xe8, 0x3c, 0x64, 0x44, 0x11, - 0xf0, 0x27, 0x80, 0x3d, 0x85, 0x8d, 0x4f, 0xa8, 0xe7, 0xb8, 0xe7, 0xca, 0x2f, 0xaa, 0x69, 0x8d, - 0x18, 0x71, 0x64, 0x5b, 0x05, 0xa4, 0x4d, 0xe4, 0x31, 0xa7, 0xcc, 0x0d, 0xcd, 0xa2, 0x8a, 0xab, - 0xb5, 0xc4, 0x66, 0xbe, 0x23, 0xcc, 0x52, 0x8c, 0xc9, 0xb5, 0x7d, 0x0e, 0x90, 0x77, 0xc2, 0x4b, - 0x50, 0x39, 0x1c, 0x3c, 0x1e, 0xec, 0x3f, 0x1d, 0x18, 0x9a, 0xdc, 0x3c, 0xda, 0x3f, 0x1c, 0x8c, - 0x7a, 0xc4, 0x40, 0xb8, 0x06, 0xa5, 0x9d, 0xce, 0xe1, 0x4e, 0xcf, 0x28, 0xe0, 0x06, 0xd4, 0x76, - 0xf7, 0x86, 0xa3, 0xfd, 0x1d, 0xd2, 0xe9, 0x1b, 0x3a, 0xc6, 0xd0, 0x54, 0x91, 0x1c, 0x2b, 0x4a, - 0xea, 0xf0, 0xb0, 0xdf, 0xef, 0x90, 0x67, 0x46, 0x49, 0x1a, 0x72, 0x6f, 0xb0, 0xbd, 0x6f, 0x94, - 0x71, 0x1d, 0xaa, 0xc3, 0x51, 0x67, 0xd4, 0x1b, 0xf6, 0x46, 0x46, 0xc5, 0x7e, 0x0c, 0xe5, 0xb8, - 0xf5, 0x3b, 0x30, 0xa2, 0xfd, 0x13, 0x82, 0x6a, 0x6a, 0x9e, 0x77, 0x61, 0xec, 0x5b, 0x96, 0x78, - 0xeb, 0xc8, 0xf5, 0xbb, 0x23, 0xbf, 0x2c, 0x41, 0x2d, 0x33, 0x23, 0xbe, 0x0f, 0xb5, 0x49, 0x30, - 0xf3, 0xc5, 0xd8, 0xf1, 0x85, 0x1a, 0x79, 0x71, 0x57, 0x23, 0x55, 0x05, 0xed, 0xf9, 0x02, 0xaf, - 0xc3, 0x52, 0x1c, 0x3e, 0x71, 0x03, 0x2a, 0xe2, 0x5e, 0xbb, 0x1a, 0x01, 0x05, 0x6e, 0x4b, 0x0c, - 0x1b, 0xa0, 0xf3, 0x99, 0xa7, 0x3a, 0x21, 0x22, 0x97, 0xf8, 0xff, 0x50, 0xe6, 0x93, 0x29, 0xf3, - 0xa8, 0x1a, 0xee, 0x3d, 0x92, 0xec, 0xf0, 0x07, 0xd0, 0xfc, 0x81, 0x45, 0xc1, 0x58, 0x4c, 0x23, - 0xc6, 0xa7, 0x81, 0x7b, 0xac, 0x06, 0x8d, 0x48, 0x43, 0xa2, 0xa3, 0x14, 0xc4, 0x1f, 0x26, 0x69, - 0xb9, 0xae, 0xb2, 0xd2, 0x85, 0x48, 0x5d, 0xe2, 0x8f, 0x52, 0x6d, 0x1f, 0x83, 0xb1, 0x90, 0x17, - 0x0b, 0xac, 0x28, 0x81, 0x88, 0x34, 0xb3, 0xcc, 0x58, 0x64, 0x07, 0x9a, 0x3e, 0x3b, 0xa5, 0xc2, - 0x39, 0x63, 0x63, 0x1e, 0x52, 0x9f, 0x9b, 0xd5, 0x37, 0x5f, 0xe5, 0xee, 0x6c, 0xf2, 0x9c, 0x89, - 0x61, 0x48, 0xfd, 0xe4, 0x0b, 0x6d, 0xa4, 0x0c, 0x89, 0x71, 0xfc, 0x11, 0x2c, 0x67, 0x25, 0x8e, - 0x99, 0x2b, 0x28, 0x37, 0x6b, 0x2d, 0xbd, 0x8d, 0x49, 0x56, 0xf9, 0x1b, 0x85, 0xde, 0x4a, 0x54, - 0xda, 0xb8, 0x09, 0x2d, 0xbd, 0x8d, 0xf2, 0x44, 0x25, 0x4c, 0x3e, 0x6f, 0xcd, 0x30, 0xe0, 0xce, - 0x82, 0xa8, 0xa5, 0xff, 0x16, 0x95, 0x32, 0x32, 0x51, 0x59, 0x89, 0x44, 0x54, 0x3d, 0x16, 0x95, - 0xc2, 0xb9, 0xa8, 0x2c, 0x31, 0x11, 0xd5, 0x88, 0x45, 0xa5, 0x70, 0x22, 0x6a, 0x0b, 0x20, 0x62, - 0x9c, 0x89, 0xf1, 0x54, 0xde, 0x7c, 0x53, 0x3d, 0x02, 0xf7, 0xff, 0xe5, 0x19, 0xdb, 0x20, 0x32, - 0x6b, 0xd7, 0xf1, 0x05, 0xa9, 0x45, 0xe9, 0xf2, 0x8e, 0xff, 0x96, 0xef, 0xfa, 0xef, 0x21, 0xd4, - 0x32, 0xea, 0xed, 0xef, 0xb9, 0x02, 0xfa, 0xb3, 0xde, 0xd0, 0x40, 0xb8, 0x0c, 0x85, 0xc1, 0xbe, - 0x51, 0xc8, 0xbf, 0x69, 0x7d, 0xb5, 0xf8, 0xf3, 0x6f, 0x16, 0xea, 0x56, 0xa0, 0xa4, 0xc4, 0x77, - 0xeb, 0x00, 0xf9, 0xec, 0xed, 0x2d, 0x80, 0xfc, 0xa2, 0xa4, 0xfd, 0x82, 0x93, 0x13, 0xce, 0x62, - 0x3f, 0xdf, 0x23, 0xc9, 0x4e, 0xe2, 0x2e, 0xf3, 0x4f, 0xc5, 0x54, 0xd9, 0xb8, 0x41, 0x92, 0x5d, - 0xf7, 0xeb, 0xcb, 0x6b, 0x4b, 0x7b, 0x79, 0x6d, 0x69, 0xaf, 0xaf, 0x2d, 0xf4, 0xe3, 0xdc, 0x42, - 0xbf, 0xcf, 0x2d, 0x74, 0x31, 0xb7, 0xd0, 0xe5, 0xdc, 0x42, 0x7f, 0xcd, 0x2d, 0xf4, 0x6a, 0x6e, - 0x69, 0xaf, 0xe7, 0x16, 0xfa, 0xe5, 0xc6, 0xd2, 0x2e, 0x6f, 0x2c, 0xed, 0xe5, 0x8d, 0xa5, 0x7d, - 0x97, 0xfd, 0x14, 0x1c, 0x95, 0xd5, 0x5f, 0xc0, 0xe7, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x4b, - 0xb6, 0xdb, 0xd4, 0x35, 0x08, 0x00, 0x00, + // 1055 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4d, 0x6f, 0x1b, 0x45, + 0x18, 0xde, 0xf1, 0xfa, 0xf3, 0x8d, 0xed, 0x6e, 0x87, 0x08, 0x56, 0x11, 0xdd, 0x24, 0x8b, 0x00, + 0x0b, 0xa1, 0x80, 0x82, 0x00, 0xb5, 0x8a, 0x90, 0xec, 0xe2, 0x7c, 0xa8, 0xb5, 0x13, 0x8d, 0x1d, + 0xaa, 0x72, 0xb1, 0x26, 0xce, 0xc4, 0x5e, 0x75, 0xbf, 0xd8, 0x19, 0x47, 0x0d, 0x27, 0x4e, 0x88, + 0x23, 0x67, 0xae, 0x5c, 0xf8, 0x05, 0xfc, 0x86, 0x1c, 0x73, 0xac, 0x38, 0x44, 0xc4, 0xb9, 0xf4, + 0xd8, 0x03, 0x3f, 0x00, 0xcd, 0xec, 0x97, 0xd3, 0x50, 0x71, 0xe9, 0x6d, 0xe6, 0x79, 0xdf, 0xe7, + 0x9d, 0x67, 0xde, 0xf7, 0xd9, 0xd1, 0x42, 0x7d, 0x1c, 0x44, 0x82, 0x3d, 0xdf, 0x08, 0xa3, 0x40, + 0x04, 0xb8, 0x1a, 0xef, 0xc2, 0xa3, 0x95, 0xe5, 0x49, 0x30, 0x09, 0x14, 0xf8, 0x99, 0x5c, 0xc5, + 0x71, 0xfb, 0xcf, 0x02, 0xd4, 0x9f, 0x44, 0x8e, 0x60, 0x84, 0xfd, 0x30, 0x63, 0x5c, 0xe0, 0x03, + 0x00, 0xe1, 0x78, 0x8c, 0xb3, 0xc8, 0x61, 0xdc, 0x44, 0x6b, 0x7a, 0x6b, 0x69, 0x73, 0x79, 0x23, + 0xad, 0xb2, 0x31, 0x74, 0x3c, 0x36, 0x50, 0xb1, 0xce, 0xca, 0xf9, 0xe5, 0xaa, 0xf6, 0xd7, 0xe5, + 0x2a, 0x3e, 0x88, 0x18, 0x75, 0xdd, 0x60, 0x3c, 0xcc, 0x78, 0x64, 0xa1, 0x06, 0xbe, 0x0f, 0xe5, + 0x41, 0x30, 0x8b, 0xc6, 0xcc, 0x2c, 0xac, 0xa1, 0x56, 0x73, 0x73, 0x3d, 0xaf, 0xb6, 0x78, 0xf2, + 0x46, 0x9c, 0xd4, 0xf5, 0x67, 0x1e, 0x49, 0x08, 0xf8, 0x01, 0x54, 0x3d, 0x26, 0xe8, 0x31, 0x15, + 0xd4, 0xd4, 0x95, 0x14, 0x33, 0x27, 0xf7, 0x98, 0x88, 0x9c, 0x71, 0x2f, 0x89, 0x77, 0x8a, 0xe7, + 0x97, 0xab, 0x88, 0x64, 0xf9, 0x78, 0x0b, 0x56, 0xf8, 0x33, 0x27, 0x1c, 0xb9, 0xf4, 0x88, 0xb9, + 0x23, 0x9f, 0x7a, 0x6c, 0x74, 0x4a, 0x5d, 0xe7, 0x98, 0x0a, 0x27, 0xf0, 0xcd, 0x97, 0x95, 0x35, + 0xd4, 0xaa, 0x92, 0xf7, 0x64, 0xca, 0x63, 0x99, 0xd1, 0xa7, 0x1e, 0xfb, 0x2e, 0x8b, 0xdb, 0xab, + 0x00, 0xb9, 0x1e, 0x5c, 0x01, 0xbd, 0x7d, 0xb0, 0x67, 0x68, 0xb8, 0x0a, 0x45, 0x72, 0xf8, 0xb8, + 0x6b, 0x20, 0x7b, 0x02, 0x8d, 0x44, 0x3d, 0x0f, 0x03, 0x9f, 0x33, 0x6c, 0x42, 0x65, 0x40, 0xbd, + 0xd0, 0x55, 0x5d, 0x43, 0x2d, 0x9d, 0xa4, 0x5b, 0x6c, 0x01, 0xec, 0x3a, 0x5c, 0x04, 0x93, 0x88, + 0x7a, 0x5c, 0x35, 0x41, 0x27, 0x0b, 0x08, 0x7e, 0x1f, 0x6a, 0xdd, 0xe7, 0xcc, 0x0b, 0x5d, 0x1a, + 0x71, 0x53, 0x57, 0xe1, 0x1c, 0xb0, 0xff, 0x41, 0x00, 0x79, 0xd7, 0x71, 0x1b, 0xca, 0xea, 0x46, + 0xe9, 0x6c, 0xde, 0xc9, 0x1b, 0xa2, 0xee, 0x71, 0x40, 0x9d, 0xa8, 0xb3, 0x9c, 0x8c, 0xa6, 0xae, + 0xa0, 0xf6, 0x31, 0x0d, 0x05, 0x8b, 0x48, 0x42, 0xc4, 0x9f, 0x43, 0x85, 0x27, 0x4a, 0x0b, 0xaa, + 0x86, 0x91, 0xd7, 0x88, 0x35, 0xab, 0x66, 0x6a, 0x24, 0x4d, 0xc3, 0x5f, 0x41, 0x8d, 0x2d, 0x28, + 0x94, 0x1c, 0x9c, 0x73, 0x52, 0xad, 0x09, 0x2b, 0x4f, 0xc5, 0xf7, 0x01, 0xa6, 0xf9, 0xcd, 0x8b, + 0xaf, 0x0b, 0xce, 0x7a, 0x90, 0x30, 0x17, 0x92, 0xed, 0x2f, 0xa1, 0x96, 0xdd, 0x07, 0x63, 0x28, + 0xca, 0x01, 0xaa, 0xc6, 0xd6, 0x89, 0x5a, 0xe3, 0x65, 0x28, 0x9d, 0x52, 0x77, 0x16, 0xbb, 0xaa, + 0x4e, 0xe2, 0x8d, 0xdd, 0x86, 0x72, 0x7c, 0x85, 0x3c, 0x2e, 0x49, 0x28, 0x89, 0xe3, 0x75, 0xa8, + 0x2b, 0x6b, 0x0a, 0xea, 0x85, 0xa3, 0x6c, 0x1a, 0x4b, 0x19, 0xd6, 0xe3, 0xf6, 0x6f, 0x05, 0x68, + 0xde, 0xf4, 0x16, 0xfe, 0x1a, 0x8a, 0xe2, 0x2c, 0x8c, 0x4b, 0x35, 0x37, 0x3f, 0x78, 0x93, 0x07, + 0x93, 0xed, 0xf0, 0x2c, 0x64, 0x44, 0x11, 0xf0, 0xa7, 0x80, 0x3d, 0x85, 0x8d, 0x4e, 0xa8, 0xe7, + 0xb8, 0x67, 0xca, 0x87, 0xea, 0xd0, 0x1a, 0x31, 0xe2, 0xc8, 0xb6, 0x0a, 0x48, 0xfb, 0xc9, 0x6b, + 0x4e, 0x99, 0x1b, 0x9a, 0x45, 0x15, 0x57, 0x6b, 0x89, 0xcd, 0x7c, 0x47, 0x98, 0xa5, 0x18, 0x93, + 0x6b, 0xfb, 0x0c, 0x20, 0x3f, 0x09, 0x2f, 0x41, 0xe5, 0xb0, 0xff, 0xa8, 0xbf, 0xff, 0xa4, 0x6f, + 0x68, 0x72, 0xf3, 0x70, 0xff, 0xb0, 0x3f, 0xec, 0x12, 0x03, 0xe1, 0x1a, 0x94, 0x76, 0xda, 0x87, + 0x3b, 0x5d, 0xa3, 0x80, 0x1b, 0x50, 0xdb, 0xdd, 0x1b, 0x0c, 0xf7, 0x77, 0x48, 0xbb, 0x67, 0xe8, + 0x18, 0x43, 0x53, 0x45, 0x72, 0xac, 0x28, 0xa9, 0x83, 0xc3, 0x5e, 0xaf, 0x4d, 0x9e, 0x1a, 0x25, + 0x69, 0xf4, 0xbd, 0xfe, 0xf6, 0xbe, 0x51, 0xc6, 0x75, 0xa8, 0x0e, 0x86, 0xed, 0x61, 0x77, 0xd0, + 0x1d, 0x1a, 0x15, 0xfb, 0x11, 0x94, 0xe3, 0xa3, 0xdf, 0x82, 0x11, 0xed, 0x9f, 0x11, 0x54, 0x53, + 0xf3, 0xbc, 0x0d, 0x63, 0xdf, 0xb0, 0xc4, 0x1b, 0x47, 0xae, 0xdf, 0x1e, 0xf9, 0x45, 0x09, 0x6a, + 0x99, 0x19, 0xf1, 0x3d, 0xa8, 0x8d, 0x83, 0x99, 0x2f, 0x46, 0x8e, 0x2f, 0xd4, 0xc8, 0x8b, 0xbb, + 0x1a, 0xa9, 0x2a, 0x68, 0xcf, 0x17, 0x78, 0x1d, 0x96, 0xe2, 0xf0, 0x89, 0x1b, 0x50, 0x11, 0x9f, + 0xb5, 0xab, 0x11, 0x50, 0xe0, 0xb6, 0xc4, 0xb0, 0x01, 0x3a, 0x9f, 0x79, 0xea, 0x24, 0x44, 0xe4, + 0x12, 0xbf, 0x0b, 0x65, 0x3e, 0x9e, 0x32, 0x8f, 0xaa, 0xe1, 0xde, 0x25, 0xc9, 0x0e, 0x7f, 0x08, + 0xcd, 0x1f, 0x59, 0x14, 0x8c, 0xc4, 0x34, 0x62, 0x7c, 0x1a, 0xb8, 0xc7, 0x6a, 0xd0, 0x88, 0x34, + 0x24, 0x3a, 0x4c, 0x41, 0xfc, 0x51, 0x92, 0x96, 0xeb, 0x2a, 0x2b, 0x5d, 0x88, 0xd4, 0x25, 0xfe, + 0x30, 0xd5, 0xf6, 0x09, 0x18, 0x0b, 0x79, 0xb1, 0xc0, 0x8a, 0x12, 0x88, 0x48, 0x33, 0xcb, 0x8c, + 0x45, 0xb6, 0xa1, 0xe9, 0xb3, 0x09, 0x15, 0xce, 0x29, 0x1b, 0xf1, 0x90, 0xfa, 0xdc, 0xac, 0xbe, + 0xfe, 0xda, 0x77, 0x66, 0xe3, 0x67, 0x4c, 0x0c, 0x42, 0xea, 0x27, 0x5f, 0x68, 0x23, 0x65, 0x48, + 0x8c, 0xe3, 0x8f, 0xe1, 0x4e, 0x56, 0xe2, 0x98, 0xb9, 0x82, 0x72, 0xb3, 0xb6, 0xa6, 0xb7, 0x30, + 0xc9, 0x2a, 0x7f, 0xab, 0xd0, 0x1b, 0x89, 0x4a, 0x1b, 0x37, 0x61, 0x4d, 0x6f, 0xa1, 0x3c, 0x51, + 0x09, 0x93, 0xcf, 0x5b, 0x33, 0x0c, 0xb8, 0xb3, 0x20, 0x6a, 0xe9, 0xff, 0x45, 0xa5, 0x8c, 0x4c, + 0x54, 0x56, 0x22, 0x11, 0x55, 0x8f, 0x45, 0xa5, 0x70, 0x2e, 0x2a, 0x4b, 0x4c, 0x44, 0x35, 0x62, + 0x51, 0x29, 0x9c, 0x88, 0xda, 0x02, 0x88, 0x18, 0x67, 0x62, 0x34, 0x95, 0x9d, 0x6f, 0xaa, 0x47, + 0xe0, 0xde, 0x7f, 0x3c, 0x63, 0x1b, 0x44, 0x66, 0xed, 0x3a, 0xbe, 0x20, 0xb5, 0x28, 0x5d, 0xde, + 0xf2, 0xdf, 0x9d, 0xdb, 0xfe, 0x7b, 0x00, 0xb5, 0x8c, 0x7a, 0xf3, 0x7b, 0xae, 0x80, 0xfe, 0xb4, + 0x3b, 0x30, 0x10, 0x2e, 0x43, 0xa1, 0xbf, 0x6f, 0x14, 0xf2, 0x6f, 0x5a, 0x5f, 0x29, 0xfe, 0xf2, + 0xbb, 0x85, 0x3a, 0x15, 0x28, 0x29, 0xf1, 0x9d, 0x3a, 0x40, 0x3e, 0x7b, 0x7b, 0x0b, 0x20, 0x6f, + 0x94, 0xb4, 0x5f, 0x70, 0x72, 0xc2, 0x59, 0xec, 0xe7, 0xbb, 0x24, 0xd9, 0x49, 0xdc, 0x65, 0xfe, + 0x44, 0x4c, 0x95, 0x8d, 0x1b, 0x24, 0xd9, 0x75, 0xbe, 0xb9, 0xb8, 0xb2, 0xb4, 0x17, 0x57, 0x96, + 0xf6, 0xea, 0xca, 0x42, 0x3f, 0xcd, 0x2d, 0xf4, 0xc7, 0xdc, 0x42, 0xe7, 0x73, 0x0b, 0x5d, 0xcc, + 0x2d, 0xf4, 0xf7, 0xdc, 0x42, 0x2f, 0xe7, 0x96, 0xf6, 0x6a, 0x6e, 0xa1, 0x5f, 0xaf, 0x2d, 0xed, + 0xe2, 0xda, 0xd2, 0x5e, 0x5c, 0x5b, 0xda, 0xf7, 0xd9, 0xcf, 0xc6, 0x51, 0x59, 0xfd, 0x5d, 0x7c, + 0xf1, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x1c, 0xa8, 0x5c, 0x4e, 0x8d, 0x08, 0x00, 0x00, } func (x WriteRequest_SourceEnum) String() string { @@ -993,6 +1021,15 @@ func (this *WriteResponse) Equal(that interface{}) bool { } else if this == nil { return false } + if this.Samples != that1.Samples { + return false + } + if this.Histograms != that1.Histograms { + return false + } + if this.Exemplars != that1.Exemplars { + return false + } return true } func (this *TimeSeries) Equal(that interface{}) bool { @@ -1443,8 +1480,11 @@ func (this *WriteResponse) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 4) + s := make([]string, 0, 7) s = append(s, "&cortexpb.WriteResponse{") + s = append(s, "Samples: "+fmt.Sprintf("%#v", this.Samples)+",\n") + s = append(s, "Histograms: "+fmt.Sprintf("%#v", this.Histograms)+",\n") + s = append(s, "Exemplars: "+fmt.Sprintf("%#v", this.Exemplars)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1713,6 +1753,21 @@ func (m *WriteResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Exemplars != 0 { + i = encodeVarintCortex(dAtA, i, uint64(m.Exemplars)) + i-- + dAtA[i] = 0x18 + } + if m.Histograms != 0 { + i = encodeVarintCortex(dAtA, i, uint64(m.Histograms)) + i-- + dAtA[i] = 0x10 + } + if m.Samples != 0 { + i = encodeVarintCortex(dAtA, i, uint64(m.Samples)) + i-- + dAtA[i] = 0x8 + } return len(dAtA) - i, nil } @@ -2277,6 +2332,15 @@ func (m *WriteResponse) Size() (n int) { } var l int _ = l + if m.Samples != 0 { + n += 1 + sovCortex(uint64(m.Samples)) + } + if m.Histograms != 0 { + n += 1 + sovCortex(uint64(m.Histograms)) + } + if m.Exemplars != 0 { + n += 1 + sovCortex(uint64(m.Exemplars)) + } return n } @@ -2547,6 +2611,9 @@ func (this *WriteResponse) String() string { return "nil" } s := strings.Join([]string{`&WriteResponse{`, + `Samples:` + fmt.Sprintf("%v", this.Samples) + `,`, + `Histograms:` + fmt.Sprintf("%v", this.Histograms) + `,`, + `Exemplars:` + fmt.Sprintf("%v", this.Exemplars) + `,`, `}`, }, "") return s @@ -2916,6 +2983,63 @@ func (m *WriteResponse) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: WriteResponse: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Samples", wireType) + } + m.Samples = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Samples |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Histograms", wireType) + } + m.Histograms = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Histograms |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Exemplars", wireType) + } + m.Exemplars = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Exemplars |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipCortex(dAtA[iNdEx:]) diff --git a/pkg/cortexpb/cortex.proto b/pkg/cortexpb/cortex.proto index cedb173183..be40afc4bf 100644 --- a/pkg/cortexpb/cortex.proto +++ b/pkg/cortexpb/cortex.proto @@ -21,7 +21,14 @@ message WriteRequest { bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus } -message WriteResponse {} +message WriteResponse { + // Samples represents X-Prometheus-Remote-Write-Written-Samples + int64 Samples = 1; + // Histograms represents X-Prometheus-Remote-Write-Written-Histograms + int64 Histograms = 2; + // Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars + int64 Exemplars = 3; +} message TimeSeries { repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; diff --git a/pkg/cortexpb/histograms.go b/pkg/cortexpb/histograms.go index 60e7207a19..d05dbaa772 100644 --- a/pkg/cortexpb/histograms.go +++ b/pkg/cortexpb/histograms.go @@ -16,6 +16,7 @@ package cortexpb import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" ) func (h Histogram) IsFloatHistogram() bool { @@ -23,6 +24,30 @@ func (h Histogram) IsFloatHistogram() bool { return ok } +func HistogramWriteV2ProtoToHistogramProto(h writev2.Histogram) Histogram { + ph := Histogram{ + Sum: h.Sum, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + NegativeSpans: spansWriteV2ProtoToSpansProto(h.NegativeSpans), + NegativeDeltas: h.NegativeDeltas, + NegativeCounts: h.NegativeCounts, + PositiveSpans: spansWriteV2ProtoToSpansProto(h.PositiveSpans), + PositiveDeltas: h.PositiveDeltas, + PositiveCounts: h.PositiveCounts, + ResetHint: Histogram_ResetHint(h.ResetHint), + TimestampMs: h.Timestamp, + } + if h.IsFloatHistogram() { + ph.Count = &Histogram_CountFloat{CountFloat: h.GetCountFloat()} + ph.ZeroCount = &Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()} + } else { + ph.Count = &Histogram_CountInt{CountInt: h.GetCountInt()} + ph.ZeroCount = &Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()} + } + return ph +} + // HistogramPromProtoToHistogramProto converts a prometheus protobuf Histogram to cortex protobuf Histogram. func HistogramPromProtoToHistogramProto(h prompb.Histogram) Histogram { ph := Histogram{ @@ -155,3 +180,12 @@ func spansPromProtoToSpansProto(s []prompb.BucketSpan) []BucketSpan { return spans } + +func spansWriteV2ProtoToSpansProto(s []writev2.BucketSpan) []BucketSpan { + spans := make([]BucketSpan, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index bdeeabebeb..9b713204d4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -150,6 +150,7 @@ type Config struct { ShardByAllLabels bool `yaml:"shard_by_all_labels"` ExtendWrites bool `yaml:"extend_writes"` SignWriteRequestsEnabled bool `yaml:"sign_write_requests"` + RemoteWrite2Enabled bool `yaml:"remote_write2_enabled"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` @@ -208,6 +209,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names and values for now), only results from quorum number of zones will be included.") f.IntVar(&cfg.NumPushWorkers, "distributor.num-push-workers", 0, "EXPERIMENTAL: Number of go routines to handle push calls from distributors to ingesters. When no workers are available, a new goroutine will be spawned automatically. If set to 0 (default), workers are disabled, and a new goroutine will be created for each push request.") + f.BoolVar(&cfg.RemoteWrite2Enabled, "distributor.remote-write2-enabled", false, "EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push request.") f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.") f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.") @@ -793,12 +795,21 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co keys := append(seriesKeys, metadataKeys...) initialMetadataIndex := len(seriesKeys) - err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID) + ws := WriteStats{} + + err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID, &ws) if err != nil { return nil, err } - return &cortexpb.WriteResponse{}, firstPartialErr + resp := &cortexpb.WriteResponse{} + if d.cfg.RemoteWrite2Enabled { + resp.Samples = ws.LoadSamples() + resp.Histograms = ws.LoadHistogram() + resp.Exemplars = ws.LoadExemplars() + } + + return resp, firstPartialErr } func (d *Distributor) updateLabelSetMetrics() { @@ -860,7 +871,7 @@ func (d *Distributor) cleanStaleIngesterMetrics() { } } -func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error { +func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string, ws *WriteStats) error { span, _ := opentracing.StartSpanFromContext(ctx, "doBatch") defer span.Finish() @@ -895,7 +906,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s } } - return d.send(localCtx, ingester, timeseries, metadata, req.Source) + return d.send(localCtx, ingester, timeseries, metadata, req.Source, ws) }, func() { cortexpb.ReuseSlice(req.Timeseries) cancel() @@ -1121,7 +1132,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { }) } -func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.WriteRequest_SourceEnum) error { +func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.WriteRequest_SourceEnum, ws *WriteStats) error { h, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return err @@ -1142,7 +1153,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time d.inflightClientRequests.Inc() defer d.inflightClientRequests.Dec() - _, err = c.PushPreAlloc(ctx, req) + resp, err := c.PushPreAlloc(ctx, req) // We should not reuse the req in case of errors: // See: https://github.com/grpc/grpc-go/issues/6355 @@ -1163,6 +1174,13 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time } } + if resp != nil { + // track write stats + ws.SetSamples(resp.Samples) + ws.SetHistograms(resp.Histograms) + ws.SetExemplars(resp.Exemplars) + } + return err } diff --git a/pkg/distributor/write_stats.go b/pkg/distributor/write_stats.go new file mode 100644 index 0000000000..0f7fbc332d --- /dev/null +++ b/pkg/distributor/write_stats.go @@ -0,0 +1,62 @@ +package distributor + +import ( + "go.uber.org/atomic" +) + +type WriteStats struct { + // Samples represents X-Prometheus-Remote-Write-Written-Samples + Samples atomic.Int64 + // Histograms represents X-Prometheus-Remote-Write-Written-Histograms + Histograms atomic.Int64 + // Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars + Exemplars atomic.Int64 +} + +func (w *WriteStats) SetSamples(samples int64) { + if w == nil { + return + } + + w.Samples.Store(samples) +} + +func (w *WriteStats) SetHistograms(histograms int64) { + if w == nil { + return + } + + w.Histograms.Store(histograms) +} + +func (w *WriteStats) SetExemplars(exemplars int64) { + if w == nil { + return + } + + w.Exemplars.Store(exemplars) +} + +func (w *WriteStats) LoadSamples() int64 { + if w == nil { + return 0 + } + + return w.Samples.Load() +} + +func (w *WriteStats) LoadHistogram() int64 { + if w == nil { + return 0 + } + + return w.Histograms.Load() +} + +func (w *WriteStats) LoadExemplars() int64 { + if w == nil { + return 0 + } + + return w.Exemplars.Load() +} diff --git a/pkg/distributor/write_stats_test.go b/pkg/distributor/write_stats_test.go new file mode 100644 index 0000000000..523f16788f --- /dev/null +++ b/pkg/distributor/write_stats_test.go @@ -0,0 +1,41 @@ +package distributor + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_SetAndLoad(t *testing.T) { + ws := &WriteStats{} + + t.Run("Samples", func(t *testing.T) { + ws.SetSamples(3) + assert.Equal(t, int64(3), ws.LoadSamples()) + }) + t.Run("Histograms", func(t *testing.T) { + ws.SetHistograms(10) + assert.Equal(t, int64(10), ws.LoadHistogram()) + }) + t.Run("Exemplars", func(t *testing.T) { + ws.SetExemplars(2) + assert.Equal(t, int64(2), ws.LoadExemplars()) + }) +} + +func Test_NilReceiver(t *testing.T) { + var ws *WriteStats + + t.Run("Samples", func(t *testing.T) { + ws.SetSamples(3) + assert.Equal(t, int64(0), ws.LoadSamples()) + }) + t.Run("Histograms", func(t *testing.T) { + ws.SetHistograms(10) + assert.Equal(t, int64(0), ws.LoadHistogram()) + }) + t.Run("Exemplars", func(t *testing.T) { + ws.SetExemplars(2) + assert.Equal(t, int64(0), ws.LoadExemplars()) + }) +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2502ef8c76..2ddb81dba6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1504,7 +1504,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte return &cortexpb.WriteResponse{}, httpgrpc.Errorf(code, wrapWithUser(firstPartialErr, userID).Error()) } - return &cortexpb.WriteResponse{}, nil + writeResponse := &cortexpb.WriteResponse{ + Samples: int64(succeededSamplesCount), + Histograms: int64(succeededHistogramsCount), + Exemplars: int64(succeededExemplarsCount), + } + + return writeResponse, nil } func (u *userTSDB) acquireAppendLock() error { diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 9cabb39522..24e3097470 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -2,22 +2,45 @@ package push import ( "context" + "fmt" "net/http" + "strconv" + "strings" "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/storage/remote" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/log" ) +const ( + remoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" + remoteWriteVersion1HeaderValue = "0.1.0" + remoteWriteVersion20HeaderValue = "2.0.0" + appProtoContentType = "application/x-protobuf" + appProtoV1ContentType = "application/x-protobuf;proto=prometheus.WriteRequest" + appProtoV2ContentType = "application/x-protobuf;proto=io.prometheus.write.v2.Request" + + rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written" + rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written" + rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written" + + errMsgNotEnabledPRW2 = "Not enabled prometheus remote write v2 push request" +) + // Func defines the type of the push. It is similar to http.HandlerFunc. type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) // Handler is a http.Handler which accepts WriteRequests. -func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -28,31 +51,238 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F logger = log.WithSourceIPs(source, logger) } } - var req cortexpb.PreallocWriteRequest - err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) + + // follow Prometheus https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = appProtoContentType + } + + msgType, err := parseProtoMsg(contentType) if err != nil { - level.Error(logger).Log("err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) + level.Error(logger).Log("Error decoding remote write request", "err", err) + http.Error(w, err.Error(), http.StatusUnsupportedMediaType) return } - req.SkipLabelNameValidation = false - if req.Source == 0 { - req.Source = cortexpb.API + if msgType != config.RemoteWriteProtoMsgV1 && msgType != config.RemoteWriteProtoMsgV2 { + level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err) + http.Error(w, err.Error(), http.StatusUnsupportedMediaType) + return + } + + enc := r.Header.Get("Content-Encoding") + if enc == "" { + } else if enc != string(remote.SnappyBlockCompression) { + err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, remote.SnappyBlockCompression) + level.Error(logger).Log("Error decoding remote write request", "err", err) + http.Error(w, err.Error(), http.StatusUnsupportedMediaType) + return } - if _, err := push(ctx, &req.WriteRequest); err != nil { - resp, ok := httpgrpc.HTTPResponseFromError(err) - if !ok { - http.Error(w, err.Error(), http.StatusInternalServerError) + switch msgType { + case config.RemoteWriteProtoMsgV1: + var req cortexpb.PreallocWriteRequest + err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) return } - if resp.GetCode()/100 == 5 { - level.Error(logger).Log("msg", "push error", "err", err) - } else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests { - level.Warn(logger).Log("msg", "push refused", "err", err) + + req.SkipLabelNameValidation = false + if req.Source == 0 { + req.Source = cortexpb.API + } + + if _, err := push(ctx, &req.WriteRequest); err != nil { + resp, ok := httpgrpc.HTTPResponseFromError(err) + if !ok { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.GetCode()/100 == 5 { + level.Error(logger).Log("msg", "push error", "err", err) + } else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests { + level.Warn(logger).Log("msg", "push refused", "err", err) + } + http.Error(w, string(resp.Body), int(resp.Code)) + } + case config.RemoteWriteProtoMsgV2: + if remoteWrite2Enabled { + var req writev2.Request + err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + v1Req, err := convertV2RequestToV1(&req) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + v1Req.SkipLabelNameValidation = false + // Current source is only API + if v1Req.Source == 0 { + v1Req.Source = cortexpb.API + } + + if resp, err := push(ctx, &v1Req.WriteRequest); err != nil { + resp, ok := httpgrpc.HTTPResponseFromError(err) + w.Header().Set(rw20WrittenSamplesHeader, "0") + w.Header().Set(rw20WrittenHistogramsHeader, "0") + w.Header().Set(rw20WrittenExemplarsHeader, "0") + if !ok { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.GetCode()/100 == 5 { + level.Error(logger).Log("msg", "push error", "err", err) + } else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests { + level.Warn(logger).Log("msg", "push refused", "err", err) + } + http.Error(w, string(resp.Body), int(resp.Code)) + } else { + w.Header().Set(rw20WrittenSamplesHeader, strconv.FormatInt(resp.Samples, 10)) + w.Header().Set(rw20WrittenHistogramsHeader, strconv.FormatInt(resp.Histograms, 10)) + w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(resp.Exemplars, 10)) + } + } else { + level.Error(logger).Log(errMsgNotEnabledPRW2) + http.Error(w, errMsgNotEnabledPRW2, http.StatusUnsupportedMediaType) + return } - http.Error(w, string(resp.Body), int(resp.Code)) } }) } + +// Refer to parseProtoMsg in https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go +func parseProtoMsg(contentType string) (config.RemoteWriteProtoMsg, error) { + contentType = strings.TrimSpace(contentType) + + parts := strings.Split(contentType, ";") + if parts[0] != appProtoContentType { + return "", fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType) + } + // Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter + for _, p := range parts[1:] { + pair := strings.Split(p, "=") + if len(pair) != 2 { + return "", fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType) + } + if pair[0] == "proto" { + ret := config.RemoteWriteProtoMsg(pair[1]) + if err := ret.Validate(); err != nil { + return "", fmt.Errorf("got %v content type; %w", contentType, err) + } + return ret, nil + } + } + // No "proto=" parameter, assuming v1. + return config.RemoteWriteProtoMsgV1, nil +} + +func convertV2RequestToV1(req *writev2.Request) (cortexpb.PreallocWriteRequest, error) { + var v1Req cortexpb.PreallocWriteRequest + v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) + var v1Metadata []*cortexpb.MetricMetadata + + b := labels.NewScratchBuilder(0) + symbols := req.Symbols + for _, v2Ts := range req.Timeseries { + lbs := v2Ts.ToLabels(&b, symbols) + v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{ + TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(lbs), + Samples: convertV2ToV1Samples(v2Ts.Samples), + Exemplars: convertV2ToV1Exemplars(b, symbols, v2Ts.Exemplars), + Histograms: convertV2ToV1Histograms(v2Ts.Histograms), + }, + }) + + if shouldConvertV2Metadata(v2Ts.Metadata) { + metricName, err := extract.MetricNameFromLabels(lbs) + if err != nil { + return v1Req, err + } + v1Metadata = append(v1Metadata, convertV2ToV1Metadata(metricName, symbols, v2Ts.Metadata)) + } + } + + v1Req.Timeseries = v1Timeseries + v1Req.Metadata = v1Metadata + + return v1Req, nil +} + +func shouldConvertV2Metadata(metadata writev2.Metadata) bool { + return !(metadata.HelpRef == 0 && metadata.UnitRef == 0 && metadata.Type == writev2.Metadata_METRIC_TYPE_UNSPECIFIED) +} + +func convertV2ToV1Histograms(histograms []writev2.Histogram) []cortexpb.Histogram { + v1Histograms := make([]cortexpb.Histogram, 0, len(histograms)) + + for _, h := range histograms { + v1Histograms = append(v1Histograms, cortexpb.HistogramWriteV2ProtoToHistogramProto(h)) + } + + return v1Histograms +} + +func convertV2ToV1Samples(samples []writev2.Sample) []cortexpb.Sample { + v1Samples := make([]cortexpb.Sample, 0, len(samples)) + + for _, s := range samples { + v1Samples = append(v1Samples, cortexpb.Sample{ + Value: s.Value, + TimestampMs: s.Timestamp, + }) + } + + return v1Samples +} + +func convertV2ToV1Metadata(name string, symbols []string, metadata writev2.Metadata) *cortexpb.MetricMetadata { + t := cortexpb.UNKNOWN + + switch metadata.Type { + case writev2.Metadata_METRIC_TYPE_COUNTER: + t = cortexpb.COUNTER + case writev2.Metadata_METRIC_TYPE_GAUGE: + t = cortexpb.GAUGE + case writev2.Metadata_METRIC_TYPE_HISTOGRAM: + t = cortexpb.HISTOGRAM + case writev2.Metadata_METRIC_TYPE_GAUGEHISTOGRAM: + t = cortexpb.GAUGEHISTOGRAM + case writev2.Metadata_METRIC_TYPE_SUMMARY: + t = cortexpb.SUMMARY + case writev2.Metadata_METRIC_TYPE_INFO: + t = cortexpb.INFO + case writev2.Metadata_METRIC_TYPE_STATESET: + t = cortexpb.STATESET + } + + return &cortexpb.MetricMetadata{ + Type: t, + MetricFamilyName: name, + Unit: symbols[metadata.UnitRef], + Help: symbols[metadata.HelpRef], + } +} + +func convertV2ToV1Exemplars(b labels.ScratchBuilder, symbols []string, v2Exemplars []writev2.Exemplar) []cortexpb.Exemplar { + v1Exemplars := make([]cortexpb.Exemplar, 0, len(v2Exemplars)) + for _, e := range v2Exemplars { + promExemplar := e.ToExemplar(&b, symbols) + v1Exemplars = append(v1Exemplars, cortexpb.Exemplar{ + Labels: cortexpb.FromLabelsToLabelAdapters(promExemplar.Labels), + Value: e.Value, + TimestampMs: e.Timestamp, + }) + } + return v1Exemplars +} diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index b806011a61..85fcaf29f6 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -10,6 +10,8 @@ import ( "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" @@ -17,30 +19,239 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" ) +func Test_convertV2RequestToV1(t *testing.T) { + var v2Req writev2.Request + + fh := tsdbutil.GenerateTestFloatHistogram(1) + ph := writev2.FromFloatHistogram(4, fh) + + symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"} + timeseries := []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, + + HelpRef: 15, + UnitRef: 16, + }, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + Samples: []writev2.Sample{{Value: 3, Timestamp: 3}}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + Histograms: []writev2.Histogram{ph, ph}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + } + + v2Req.Symbols = symbols + v2Req.Timeseries = timeseries + v1Req, err := convertV2RequestToV1(&v2Req) + assert.NoError(t, err) + expectedSamples := 3 + expectedExemplars := 2 + expectedHistograms := 2 + countSamples := 0 + countExemplars := 0 + countHistograms := 0 + + for _, ts := range v1Req.Timeseries { + countSamples += len(ts.Samples) + countExemplars += len(ts.Exemplars) + countHistograms += len(ts.Histograms) + } + + assert.Equal(t, expectedSamples, countSamples) + assert.Equal(t, expectedExemplars, countExemplars) + assert.Equal(t, expectedHistograms, countHistograms) + assert.Equal(t, 4, len(v1Req.Timeseries)) + assert.Equal(t, 1, len(v1Req.Metadata)) +} + func TestHandler_remoteWrite(t *testing.T) { - req := createRequest(t, createPrometheusRemoteWriteProtobuf(t)) - resp := httptest.NewRecorder() - handler := Handler(100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) - handler.ServeHTTP(resp, req) - assert.Equal(t, 200, resp.Code) + t.Run("remote write v1", func(t *testing.T) { + handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + }) + t.Run("remote write v2", func(t *testing.T) { + handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + + // test header value + respHeader := resp.Header() + assert.Equal(t, "1", respHeader[rw20WrittenSamplesHeader][0]) + assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0]) + assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0]) + }) + t.Run("remote write v2 with not support remote write 2.0", func(t *testing.T) { + handler := Handler(false, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, http.StatusUnsupportedMediaType, resp.Code) + }) +} + +func TestHandler_ContentTypeAndEncoding(t *testing.T) { + sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") + handler := Handler(true, 100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + + tests := []struct { + description string + reqHeaders map[string]string + expectedCode int + isV2 bool + }{ + { + description: "[RW 2.0] correct content-type", + reqHeaders: map[string]string{ + "Content-Type": appProtoV2ContentType, + "Content-Encoding": "snappy", + remoteWriteVersionHeader: "2.0.0", + }, + expectedCode: http.StatusOK, + isV2: true, + }, + { + description: "[RW 1.0] correct content-type", + reqHeaders: map[string]string{ + "Content-Type": appProtoV1ContentType, + "Content-Encoding": "snappy", + remoteWriteVersionHeader: "0.1.0", + }, + expectedCode: http.StatusOK, + isV2: false, + }, + { + description: "[RW 2.0] wrong content-type", + reqHeaders: map[string]string{ + "Content-Type": "yolo", + "Content-Encoding": "snappy", + remoteWriteVersionHeader: "2.0.0", + }, + expectedCode: http.StatusUnsupportedMediaType, + isV2: true, + }, + { + description: "[RW 2.0] wrong content-type", + reqHeaders: map[string]string{ + "Content-Type": "application/x-protobuf;proto=yolo", + "Content-Encoding": "snappy", + remoteWriteVersionHeader: "2.0.0", + }, + expectedCode: http.StatusUnsupportedMediaType, + isV2: true, + }, + { + description: "[RW 2.0] wrong content-encoding", + reqHeaders: map[string]string{ + "Content-Type": "application/x-protobuf;proto=io.prometheus.write.v2.Request", + "Content-Encoding": "zstd", + remoteWriteVersionHeader: "2.0.0", + }, + expectedCode: http.StatusUnsupportedMediaType, + isV2: true, + }, + { + description: "no header, should treated as RW 1.0", + expectedCode: http.StatusOK, + isV2: false, + }, + { + description: "missing content-type, should treated as RW 1.0", + reqHeaders: map[string]string{ + "Content-Encoding": "snappy", + remoteWriteVersionHeader: "2.0.0", + }, + expectedCode: http.StatusOK, + isV2: false, + }, + { + description: "missing content-encoding", + reqHeaders: map[string]string{ + "Content-Type": appProtoV2ContentType, + remoteWriteVersionHeader: "2.0.0", + }, + expectedCode: http.StatusOK, + isV2: true, + }, + { + description: "missing remote write version, should treated based on Content-type", + reqHeaders: map[string]string{ + "Content-Type": appProtoV2ContentType, + "Content-Encoding": "snappy", + }, + expectedCode: http.StatusOK, + isV2: true, + }, + { + description: "missing remote write version, should treated based on Content-type", + reqHeaders: map[string]string{ + "Content-Type": appProtoV1ContentType, + "Content-Encoding": "snappy", + }, + expectedCode: http.StatusOK, + isV2: false, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + if test.isV2 { + req := createRequestWithHeaders(t, test.reqHeaders, createCortexRemoteWriteV2Protobuf(t, false, cortexpb.API)) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, test.expectedCode, resp.Code) + } else { + req := createRequestWithHeaders(t, test.reqHeaders, createCortexWriteRequestProtobuf(t, false, cortexpb.API)) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, test.expectedCode, resp.Code) + } + }) + } } func TestHandler_cortexWriteRequest(t *testing.T) { - req := createRequest(t, createCortexWriteRequestProtobuf(t, false)) - resp := httptest.NewRecorder() sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.RULE)) - handler.ServeHTTP(resp, req) - assert.Equal(t, 200, resp.Code) + handler := Handler(true, 100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + + t.Run("remote write v1", func(t *testing.T) { + req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, 200, resp.Code) + }) + t.Run("remote write v2", func(t *testing.T) { + req := createRequest(t, createCortexRemoteWriteV2Protobuf(t, false, cortexpb.API), true) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, 200, resp.Code) + }) } func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) { for _, req := range []*http.Request{ - createRequest(t, createCortexWriteRequestProtobuf(t, true)), - createRequest(t, createCortexWriteRequestProtobuf(t, false)), + createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), + createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), } { resp := httptest.NewRecorder() - handler := Handler(100000, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) + handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -54,21 +265,86 @@ func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequest_ assert.Equal(t, "foo", request.Timeseries[0].Labels[0].Value) assert.Equal(t, expectSource, request.Source) assert.False(t, request.SkipLabelNameValidation) - return &cortexpb.WriteResponse{}, nil + + resp := &cortexpb.WriteResponse{ + Samples: 1, + Histograms: 1, + Exemplars: 1, + } + + return resp, nil + } +} + +func createRequestWithHeaders(t *testing.T, headers map[string]string, protobuf []byte) *http.Request { + t.Helper() + inoutBytes := snappy.Encode(nil, protobuf) + req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) + require.NoError(t, err) + + for k, v := range headers { + req.Header.Set(k, v) } + return req } -func createRequest(t *testing.T, protobuf []byte) *http.Request { +func createRequest(t *testing.T, protobuf []byte, isV2 bool) *http.Request { t.Helper() inoutBytes := snappy.Encode(nil, protobuf) req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) require.NoError(t, err) + req.Header.Add("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + if isV2 { + req.Header.Set("Content-Type", appProtoV2ContentType) + req.Header.Set("X-Prometheus-Remote-Write-Version", remoteWriteVersion20HeaderValue) + return req + } + + req.Header.Set("Content-Type", appProtoContentType) + req.Header.Set("X-Prometheus-Remote-Write-Version", remoteWriteVersion1HeaderValue) return req } +func createCortexRemoteWriteV2Protobuf(t *testing.T, skipLabelNameValidation bool, source cortexpb.WriteRequest_SourceEnum) []byte { + t.Helper() + input := writev2.Request{ + Symbols: []string{"", "__name__", "foo"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{ + {Value: 1, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }, + }, + } + + inoutBytes, err := input.Marshal() + require.NoError(t, err) + return inoutBytes +} + +func createPrometheusRemoteWriteV2Protobuf(t *testing.T) []byte { + t.Helper() + input := writev2.Request{ + Symbols: []string{"", "__name__", "foo"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{ + {Value: 1, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }, + }, + } + + inoutBytes, err := input.Marshal() + require.NoError(t, err) + return inoutBytes +} + func createPrometheusRemoteWriteProtobuf(t *testing.T) []byte { t.Helper() input := prompb.WriteRequest{ @@ -87,7 +363,7 @@ func createPrometheusRemoteWriteProtobuf(t *testing.T) []byte { require.NoError(t, err) return inoutBytes } -func createCortexWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool) []byte { +func createCortexWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool, source cortexpb.WriteRequest_SourceEnum) []byte { t.Helper() ts := cortexpb.PreallocTimeseries{ TimeSeries: &cortexpb.TimeSeries{ @@ -101,7 +377,7 @@ func createCortexWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool } input := cortexpb.WriteRequest{ Timeseries: []cortexpb.PreallocTimeseries{ts}, - Source: cortexpb.RULE, + Source: source, SkipLabelNameValidation: skipLabelNameValidation, } inoutBytes, err := input.Marshal()