diff --git a/go.mod b/go.mod index ae6c454..0d55e06 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,9 @@ go 1.20 require ( github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.54.0 + google.golang.org/protobuf v1.34.1 k8s.io/klog/v2 v2.120.1 ) @@ -13,11 +15,8 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect golang.org/x/sys v0.21.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect ) diff --git a/go.sum b/go.sum index 9be6158..918d333 100644 --- a/go.sum +++ b/go.sum @@ -1,26 +1,17 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -29,41 +20,25 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= -github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= -github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= -github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= -github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= -github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= -github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/oauth2 v0.12.0 h1:smVPGxink+n1ZI5pkQa8y6fZT0RW0MgCO5bFpepy4B4= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/oauth2 v0.19.0 h1:9+E/EZBCbTLNrbN35fHv/a/d/mOBatymz1zbtQrXpIg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= -k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= diff --git a/main.go b/main.go index 3fe4be3..2f26d8e 100644 --- a/main.go +++ b/main.go @@ -4,24 +4,31 @@ import ( "context" "crypto/tls" "flag" - "fmt" + "maps" "net" "net/http" "os" "path/filepath" + "strconv" + "strings" + "sync" "time" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "google.golang.org/protobuf/proto" "k8s.io/klog/v2" ) var ( insecureListenAddress string upstream string + metadataUpstream string tlsSkipVerify bool bearerFile string forceGet bool @@ -30,12 +37,16 @@ var ( func parseFlag() { flag.StringVar(&insecureListenAddress, "insecure-listen-address", "127.0.0.1:9099", "The address which proxy listens on") flag.StringVar(&upstream, "upstream", "http://127.0.0.1:9090", "The upstream thanos URL") + flag.StringVar(&metadataUpstream, "metadata-upstream", "http://127.0.0.1:9090", "The upstream metadata API provider URL") flag.BoolVar(&tlsSkipVerify, "tlsSkipVerify", false, "Skip TLS Verification") flag.StringVar(&bearerFile, "bearer-file", "", "File containing bearer token for API requests") flag.BoolVar(&forceGet, "force-get", false, "Force api.Client to use GET by rejecting POST requests") flag.Parse() } +var metricMetadata map[string][]v1.Metadata +var mutex = &sync.RWMutex{} + func main() { parseFlag() ctx, cancel := context.WithCancel(context.Background()) @@ -61,6 +72,16 @@ func main() { if err != nil { klog.Fatalf("error creating API client:", err) } + + // Create a new client for metadata. + metadataC, err := api.NewClient(api.Config{ + Address: metadataUpstream, + RoundTripper: roundTripper, + }) + if err != nil { + klog.Fatalf("error creating API metadata client:", err) + } + // Collect client options options := []clientOption{} if bearerFile != "" { @@ -84,6 +105,35 @@ func main() { } apiClient := v1.NewAPI(c) + if metadataC, err = newClient(metadataC, options...); err != nil { + klog.Fatalf("error building custom metadata API client:", err) + } + apiMetadataClient := v1.NewAPI(metadataC) + + go func() { + var err error + for { + select { + case <-ctx.Done(): + return + default: + } + + klog.Infof("refreshing metric metadata") + mutex.Lock() + metadataCtx, metadataCancel := context.WithTimeout(ctx, 5*time.Second) + metricMetadata, err = apiMetadataClient.Metadata(metadataCtx, "", "") + mutex.Unlock() + if err != nil { + klog.Errorf("error refreshing metric metadata: %s", err.Error()) + time.Sleep(250 * time.Millisecond) + } else { + time.Sleep(1 * time.Minute) + } + metadataCancel() + } + }() + // server mux mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { @@ -100,6 +150,9 @@ func federate(ctx context.Context, w http.ResponseWriter, r *http.Request, apiCl params := r.URL.Query() matchQueries := params["match[]"] + // negotiate content type, this will inform the encoder how to format the output + contentType := expfmt.NegotiateIncludingOpenMetrics(r.Header) + nctx, ncancel := context.WithTimeout(r.Context(), 2*time.Minute) defer ncancel() if params.Del("match[]"); len(params) > 0 { @@ -112,7 +165,7 @@ func federate(ctx context.Context, w http.ResponseWriter, r *http.Request, apiCl responseTime := time.Since(start).Seconds() if err != nil { - klog.Errorf("query failed:", err) + klog.Errorf("query failed: %s", err.Error()) scrapeDurations.With(prometheus.Labels{ "match_query": matchQuery, "status_code": "500", @@ -136,13 +189,161 @@ func federate(ctx context.Context, w http.ResponseWriter, r *http.Request, apiCl "match_query": matchQuery, "status_code": "200", }).Observe(responseTime) - printVector(w, val) + printVector(w, contentType, val) } } -func printVector(w http.ResponseWriter, v model.Value) { +// this is an alternative implementation of the original code. In order to be as compliant as possible with +// prometheus endpoint content negotiation, we need to transform model.Value into io_prometheus_client.MetricFamily +// to be able to reuse the expfmt package. +// +// This requires a bunch of manual work since there is nothing in the SDK that supports this path, and is subject +// to be broken if the SDK changes. +// +// In order to be able to create a MetricFamily we rely on Prometheus Metadata API. Thanos Queriers do not support +// this API, so the upstream needs to be the Prometheus server. That kind of compromises the whole point of this module. +// +// The metadata is refreshed asynchronously on a predefined interval not to put that on the critical path +func printVector(w http.ResponseWriter, contentType expfmt.Format, v model.Value) { vec := v.(model.Vector) + encoder := expfmt.NewEncoder(w, contentType) + + // clone to prevent locking the metadata for the entire duration of the encoding + mutex.RLock() + mMetadata := maps.Clone(metricMetadata) + mutex.RUnlock() + + metricFamilies := make(map[string]*io_prometheus_client.MetricFamily) for _, sample := range vec { - fmt.Fprintf(w, "%v %v %v\n", sample.Metric, sample.Value, int(sample.Timestamp)) + // value.Metric brings the metric name as a label, so we allocate one less so it does not end up as a label in the metric family + labelPairs := make([]*io_prometheus_client.LabelPair, 0, len(sample.Metric)-1) // we dont add metric name as label + var metricName string + for labelName, labelValue := range sample.Metric { + ln := string(labelName) + lv := string(labelValue) + if ln == model.MetricNameLabel { + metricName = lv + continue + } + labelPairs = append(labelPairs, &io_prometheus_client.LabelPair{ + Name: &ln, + Value: &lv, + }) + } + + strippedMetricName := metricName + if strings.HasSuffix(metricName, "_bucket") || strings.HasSuffix(metricName, "_sum") || + strings.HasSuffix(metricName, "_count") || strings.HasSuffix(metricName, "_total") { + strippedMetricName = metricName[:strings.LastIndex(metricName, "_")] + } + mms, ok := mMetadata[strippedMetricName] + if !ok { + // either metadata refresh will eventually fix this or there is something else going on + klog.Warningf("metadata not found for metric %s, metric metadata will be stubbed out and type set to unknown", metricName) + } + + if len(mms) > 1 { + // FIXME how to deal with this? + klog.Warningf("metric %s has multiple metadata entries, using the first one", metricName) + } + + // recover on unknown metrics by stubbing it out and setting to unknown type + var mm v1.Metadata + if len(mms) == 0 { + mm = v1.Metadata{ + Type: v1.MetricTypeUnknown, + Help: "no help found", + Unit: "no unit found", + } + } else { + mm = mms[0] + } + + mf, ok := metricFamilies[strippedMetricName] + if !ok { + var mType io_prometheus_client.MetricType + switch mm.Type { + case v1.MetricTypeCounter: + mType = io_prometheus_client.MetricType_COUNTER + case v1.MetricTypeGauge: + mType = io_prometheus_client.MetricType_GAUGE + case v1.MetricTypeHistogram: + mType = io_prometheus_client.MetricType_HISTOGRAM + case v1.MetricTypeSummary: + mType = io_prometheus_client.MetricType_SUMMARY + case v1.MetricTypeUnknown: + mType = io_prometheus_client.MetricType_UNTYPED + default: + klog.Warningf("unknown metric type %s for metric %s, dropping metric", mm.Type, strippedMetricName) + } + + mf = &io_prometheus_client.MetricFamily{ + Name: &metricName, + Help: &mm.Help, + Type: &mType, + Unit: &mm.Unit, + } + + mf.Metric = make([]*io_prometheus_client.Metric, 0) + metricFamilies[strippedMetricName] = mf + } + + metric := &io_prometheus_client.Metric{ + Label: labelPairs, + TimestampMs: proto.Int64(sample.Timestamp.UnixNano() / 1000), + } + switch mm.Type { + case v1.MetricTypeCounter: + metric.Counter = &io_prometheus_client.Counter{ + Value: proto.Float64(float64(sample.Value)), + } + case v1.MetricTypeGauge: + metric.Gauge = &io_prometheus_client.Gauge{ + Value: proto.Float64(float64(sample.Value)), + } + case v1.MetricTypeHistogram: + metric.Histogram = &io_prometheus_client.Histogram{ + SampleCount: proto.Uint64(0), + SampleSum: proto.Float64(0), + } + hist := sample.Histogram + if hist != nil { + metric.Histogram = &io_prometheus_client.Histogram{ + SampleCount: proto.Uint64(uint64(hist.Count)), + SampleSum: proto.Float64(float64(hist.Sum)), + Bucket: make([]*io_prometheus_client.Bucket, 0, len(hist.Buckets)), + } + for _, bucket := range sample.Histogram.Buckets { + upperFloat, err := strconv.ParseFloat(bucket.Upper.String(), 64) + if err != nil { + klog.Warningf("error parsing bucket upper bound %s on histogram %s, dropping bucket", bucket.Upper, strippedMetricName) + continue + } + b := &io_prometheus_client.Bucket{ + UpperBound: proto.Float64(upperFloat), + CumulativeCount: proto.Uint64(uint64(bucket.Count)), + } + metric.Histogram.Bucket = append(metric.Histogram.Bucket, b) + } + } + case v1.MetricTypeSummary: + // FIXME how to deal with this? + klog.Warningf("unsupported summary type on metric %s, dropping metric", strippedMetricName) + continue + case v1.MetricTypeUnknown: + metric.Untyped = &io_prometheus_client.Untyped{ + Value: proto.Float64(float64(sample.Value)), + } + default: + klog.Warningf("unknown metric type %s on %s, dropping metric", mm.Type, strippedMetricName) + continue + } + mf.Metric = append(mf.Metric, metric) + } + + for _, fm := range metricFamilies { + if err := encoder.Encode(fm); err != nil { + klog.Errorf("error encoding metric family: %s", err.Error()) + } } }