diff --git a/metrics/config.go b/metrics/config.go index d4ae24ad54..535c8e9bce 100644 --- a/metrics/config.go +++ b/metrics/config.go @@ -208,7 +208,7 @@ func createMetricsConfig(ctx context.Context, ops ExporterOptions) (*metricsConf } switch lb := metricsBackend(strings.ToLower(backend)); lb { - case stackdriver, prometheus, openCensus: + case stackdriver, prometheus, openCensus, none: mc.backendDestination = lb default: return nil, fmt.Errorf("unsupported metrics backend value %q", backend) diff --git a/metrics/e2e_test.go b/metrics/e2e_test.go new file mode 100644 index 0000000000..2163f0c1b6 --- /dev/null +++ b/metrics/e2e_test.go @@ -0,0 +1,620 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "os" + "sort" + "strings" + "sync" + "testing" + "time" + + sd "contrib.go.opencensus.io/exporter/stackdriver" + ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" + ocresource "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + "go.opencensus.io/resource" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "k8s.io/apimachinery/pkg/util/wait" + + emptypb "github.com/golang/protobuf/ptypes/empty" + "github.com/google/go-cmp/cmp" + "google.golang.org/api/option" + metricpb "google.golang.org/genproto/googleapis/api/metric" + stackdriverpb "google.golang.org/genproto/googleapis/monitoring/v3" + "google.golang.org/grpc" + proto "google.golang.org/protobuf/proto" + + logtesting "knative.dev/pkg/logging/testing" + "knative.dev/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" +) + +var ( + NamespaceTagKey = tag.MustNewKey(metricskey.LabelNamespaceName) + ServiceTagKey = tag.MustNewKey(metricskey.LabelServiceName) + ConfigTagKey = tag.MustNewKey(metricskey.LabelConfigurationName) + RevisionTagKey = tag.MustNewKey(metricskey.LabelRevisionName) +) + +type metricExtract struct { + Name string + Labels map[string]string + Value int64 +} + +func (m metricExtract) Key() string { + return fmt.Sprintf("%s<%s>", m.Name, resource.EncodeLabels(m.Labels)) +} + +func (m metricExtract) String() string { + return fmt.Sprintf("%s:%d", m.Key(), m.Value) +} + +func initStackdriverFake(sdFake *stackDriverFake) error { + if err := sdFake.start(); err != nil { + return err + } + conn, err := grpc.Dial(sdFake.address, grpc.WithInsecure()) + if err != nil { + return err + } + newStackdriverExporterFunc = func(o sd.Options) (view.Exporter, error) { + o.MonitoringClientOptions = append(o.MonitoringClientOptions, option.WithGRPCConn(conn)) + return newOpencensusSDExporter(o) + } + // File: must exist, be json of credentialsFile, and type must be a jwtConfig or oauth2Config + tmp, err := ioutil.TempFile("", "metrics-sd-test") + if err != nil { + return err + } + defer tmp.Close() + credentialsContent := []byte(`{"type": "service_account"}`) + if _, err := tmp.Write(credentialsContent); err != nil { + return err + } + os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", tmp.Name()) + return nil +} + +func sortMetrics() cmp.Option { + return cmp.Transformer("Sort", func(in []metricExtract) []string { + out := make([]string, 0, len(in)) + seen := map[string]int{} + for _, m := range in { + // Keep only the newest report for a key + key := m.Key() + if seen[key] == 0 { + out = append(out, m.String()) + seen[key] = len(out) // Store address+1 to avoid doubling first item. + } else { + out[seen[key]-1] = m.String() + } + } + sort.Strings(out) + return out + }) +} + +// Begin table tests for exporters +func TestMetricsExport(t *testing.T) { + TestOverrideBundleCount = 1 + t.Cleanup(func() { TestOverrideBundleCount = 0 }) + ocFake := openCensusFake{address: "localhost:12345"} + sdFake := stackDriverFake{} + prometheusPort := 19090 + configForBackend := func(backend metricsBackend) ExporterOptions { + return ExporterOptions{ + Domain: servingDomain, + Component: testComponent, + PrometheusPort: prometheusPort, + ConfigMap: map[string]string{ + BackendDestinationKey: string(backend), + collectorAddressKey: ocFake.address, + allowStackdriverCustomMetricsKey: "true", + stackdriverCustomMetricSubDomainKey: servingDomain, + reportingPeriodKey: "1", + }, + } + } + + resources := []*resource.Resource{{ + Type: "revision", + Labels: map[string]string{ + "project": "p1", + "revision": "r1", + }, + }, { + Type: "revision", + Labels: map[string]string{ + "project": "p1", + "revision": "r2", + }, + }} + gauge := stats.Int64("testing/value", "Stored value", stats.UnitDimensionless) + counter := stats.Int64("export counts", "Times through the export", stats.UnitDimensionless) + gaugeView := &view.View{ + Name: "testing/value", + Description: "Test value", + Measure: gauge, + Aggregation: view.LastValue(), + } + resourceCounter := &view.View{ + Name: "resource_global_export_count", + Description: "Count of exports via RegisterResourceView.", + Measure: counter, + Aggregation: view.Count(), + } + globalCounter := &view.View{ + Name: "global_export_counts", + Description: "Count of exports via standard OpenCensus view.", + Measure: counter, + Aggregation: view.Count(), + } + + expected := []metricExtract{ + {"knative.dev/serving/testComponent/global_export_counts", map[string]string{}, 2}, + {"knative.dev/serving/testComponent/resource_global_export_count", map[string]string{}, 2}, + {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r1"}, 0}, + {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r2"}, 1}, + } + + harnesses := []struct { + name string + init func(t *testing.T) error + validate func(t *testing.T) + }{{ + name: "Prometheus", + init: func(t *testing.T) error { + if err := UpdateExporter(context.Background(), configForBackend(prometheus), logtesting.TestLogger(t)); err != nil { + return err + } + // Wait for the webserver to actually start serving metrics + return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) + return err == nil && resp.StatusCode == http.StatusOK, nil + }) + }, + validate: func(t *testing.T) { + metricstest.EnsureRecorded() + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) + if err != nil { + t.Fatalf("failed to fetch prometheus metrics: %+v", err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("failed to read prometheus response: %+v", err) + } + const want = `# HELP testComponent_global_export_counts Count of exports via standard OpenCensus view. +# TYPE testComponent_global_export_counts counter +testComponent_global_export_counts 2 +# HELP testComponent_resource_global_export_count Count of exports via RegisterResourceView. +# TYPE testComponent_resource_global_export_count counter +testComponent_resource_global_export_count 2 +# HELP testComponent_testing_value Test value +# TYPE testComponent_testing_value gauge +testComponent_testing_value{project="p1",revision="r1"} 0 +testComponent_testing_value{project="p1",revision="r2"} 1 +` + if diff := cmp.Diff(want, string(body)); diff != "" { + t.Errorf("Unexpected prometheus output (-want +got):\n%s", diff) + } + }, + }, { + name: "OpenCensus", + init: func(t *testing.T) error { + if err := ocFake.start(len(resources) + 1); err != nil { + return err + } + t.Log("Created exporter at", ocFake.address) + return UpdateExporter(context.Background(), configForBackend(openCensus), logtesting.TestLogger(t)) + }, + validate: func(t *testing.T) { + metricstest.EnsureRecorded() + records := []metricExtract{} + // Each Resource has an independent thread invoking reportView; this + // set avoids the race condition where we get two reports for the + // same metric on the channel before we get any reports for another + // metric. + keys := map[string]struct{}{} + timeout := time.After(5 * time.Second) + loop: + for { + select { + case record := <-ocFake.published: + if record == nil { + continue loop + } + for _, m := range record.Metrics { + if len(m.Timeseries) > 0 { + labels := map[string]string{} + if record.Resource != nil { + labels = record.Resource.Labels + } + metric := metricExtract{ + Name: m.MetricDescriptor.Name, + Labels: labels, + Value: m.Timeseries[0].Points[0].GetInt64Value(), + } + records = append(records, metric) + keys[metric.Key()] = struct{}{} + } + } + if len(keys) >= len(expected) { + break loop + } + case <-timeout: + t.Error("Timeout reading input") + break loop + } + } + + if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { + t.Errorf("Unexpected OpenCensus exports (-want +got):\n%s", diff) + } + }, + }, { + name: "Stackdriver", + init: func(t *testing.T) error { + if err := initStackdriverFake(&sdFake); err != nil { + return err + } + return UpdateExporter(context.Background(), configForBackend(stackdriver), logtesting.TestLogger(t)) + }, + validate: func(t *testing.T) { + records := []metricExtract{} + for record := range sdFake.published { + for _, ts := range record.TimeSeries { + name := ts.Metric.Type[len("custom.googleapis.com/"):] + records = append(records, metricExtract{ + Name: name, + Labels: ts.Resource.Labels, + Value: ts.Points[0].Value.GetInt64Value(), + }) + } + if len(records) >= 4 { + // There's no way to synchronize on the internal timer used + // by metricsexport.IntervalReader, so shut down the + // exporter after the first report cycle. + FlushExporter() + sdFake.srv.GracefulStop() + } + } + if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { + t.Errorf("Unexpected Stackdriver exports (-want +got):\n%s", diff) + } + }, + }} + + for _, c := range harnesses { + t.Run(c.name, func(t *testing.T) { + ClearMetersForTest() + sdFake.t = t + if err := c.init(t); err != nil { + t.Fatalf("unable to init: %+v", err) + } + + view.Register(globalCounter) + if err := RegisterResourceView(gaugeView, resourceCounter); err != nil { + t.Fatal("Unable to register views:", err) + } + t.Cleanup(func() { + view.Unregister(globalCounter) + UnregisterResourceView(gaugeView, resourceCounter) + }) + + for i, r := range resources { + ctx := context.Background() + Record(ctx, counter.M(int64(1))) + if r != nil { + ctx = metricskey.WithResource(ctx, *r) + } + Record(ctx, gauge.M(int64(i))) + } + c.validate(t) + }) + } +} + +func TestStackDriverExports(t *testing.T) { + TestOverrideBundleCount = 1 + t.Cleanup(func() { TestOverrideBundleCount = 0 }) + eo := ExporterOptions{ + Domain: servingDomain, + Component: "autoscaler", + ConfigMap: map[string]string{ + BackendDestinationKey: string(stackdriver), + reportingPeriodKey: "1", + stackdriverProjectIDKey: "foobar", + }, + } + + label1 := map[string]string{ + "cluster_name": "test-cluster", + "configuration_name": "config", + "location": "test-location", + "namespace_name": "ns", + "project_id": "foobar", + "revision_name": "revision", + "service_name": "service", + } + label2 := map[string]string{ + "cluster_name": "test-cluster", + "configuration_name": "config2", + "location": "test-location", + "namespace_name": "ns2", + "project_id": "foobar", + "revision_name": "revision2", + "service_name": "service2", + } + batchLabels := map[string]string{ + "namespace_name": "ns2", + "configuration_name": "config2", + "revision_name": "revision2", + "service_name": "service2", + } + harness := []struct { + name string + allowCustomMetrics string + expected []metricExtract + }{{ + name: "Allow custom metrics", + allowCustomMetrics: "true", + expected: []metricExtract{{ + "knative.dev/serving/autoscaler/actual_pods", + label1, + 1, + }, { + "knative.dev/serving/autoscaler/desired_pods", + label2, + 2, + }, { + "custom.googleapis.com/knative.dev/autoscaler/not_ready_pods", + batchLabels, + 3, + }}, + }, { + name: "Don't allow custom metrics", + allowCustomMetrics: "false", + expected: []metricExtract{{ + "knative.dev/serving/autoscaler/actual_pods", + label1, + 1, + }, { + "knative.dev/serving/autoscaler/desired_pods", + label2, + 2, + }}, + }} + + for _, tc := range harness { + t.Run(tc.name, func(t *testing.T) { + eo.ConfigMap[allowStackdriverCustomMetricsKey] = tc.allowCustomMetrics + // Change the cluster name to reinitialize the exporter and pick up a new port. + eo.ConfigMap[stackdriverClusterNameKey] = tc.name + actualPodCountM := stats.Int64( + "actual_pods", + "Number of pods that are allocated currently", + stats.UnitDimensionless) + actualPodsCountView := &view.View{ + Description: "Number of pods that are allocated currently", + Measure: actualPodCountM, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{NamespaceTagKey, ServiceTagKey, ConfigTagKey, RevisionTagKey}, + } + desiredPodCountM := stats.Int64( + "desired_pods", + "Number of pods that are desired", + stats.UnitDimensionless) + desiredPodsCountView := &view.View{ + Description: "Number of pods that are desired", + Measure: desiredPodCountM, + Aggregation: view.LastValue(), + } + notReadyPodCountM := stats.Int64( + "not_ready_pods", + "Number of pods that are not ready", + stats.UnitDimensionless) + customView := &view.View{ + Description: "non-knative-revision metric per KnativeRevisionMetrics", + Measure: notReadyPodCountM, + Aggregation: view.LastValue(), + } + + sdFake := stackDriverFake{t: t} + if err := initStackdriverFake(&sdFake); err != nil { + t.Error("Init stackdriver failed", err) + } + if err := UpdateExporter(context.Background(), eo, logtesting.TestLogger(t)); err != nil { + t.Error("UpdateExporter failed", err) + } + + if err := RegisterResourceView(desiredPodsCountView, actualPodsCountView, customView); err != nil { + t.Fatalf("unable to register view: %+v", err) + } + t.Cleanup(func() { + UnregisterResourceView(desiredPodsCountView, actualPodsCountView, customView) + }) + + ctx, err := tag.New(context.Background(), tag.Upsert(NamespaceTagKey, "ns"), + tag.Upsert(ServiceTagKey, "service"), + tag.Upsert(ConfigTagKey, "config"), + tag.Upsert(RevisionTagKey, "revision")) + if err != nil { + t.Fatal("Unable to create tags", err) + } + Record(ctx, actualPodCountM.M(int64(1))) + + r := resource.Resource{ + Type: "testing", + Labels: batchLabels, + } + RecordBatch( + metricskey.WithResource(context.Background(), r), + desiredPodCountM.M(int64(2)), + notReadyPodCountM.M(int64(3))) + + records := []metricExtract{} + loop: + for { + select { + case record := <-sdFake.published: + for _, ts := range record.TimeSeries { + extracted := metricExtract{ + Name: ts.Metric.Type, + Labels: ts.Resource.Labels, + Value: ts.Points[0].Value.GetInt64Value(), + } + // Override 'cluster-name' label to reset to a fixed value + if extracted.Labels["cluster_name"] != "" { + extracted.Labels["cluster_name"] = "test-cluster" + } + records = append(records, extracted) + if strings.HasPrefix(ts.Metric.Type, "knative.dev/") { + if diff := cmp.Diff(ts.Resource.Type, metricskey.ResourceTypeKnativeRevision); diff != "" { + t.Errorf("Incorrect resource type for %q: (-want +got):\n%s", ts.Metric.Type, diff) + } + } + } + if len(records) >= len(tc.expected) { + // There's no way to synchronize on the internal timer used + // by metricsexport.IntervalReader, so shut down the + // exporter after the first report cycle. + FlushExporter() + sdFake.srv.GracefulStop() + break loop + } + case <-time.After(4 * time.Second): + t.Error("Timeout reading records from Stackdriver") + break loop + } + } + if diff := cmp.Diff(tc.expected, records, sortMetrics()); diff != "" { + t.Errorf("Unexpected stackdriver knative exports (-want +got):\n%s", diff) + } + }) + } +} + +type openCensusFake struct { + ocmetrics.UnimplementedMetricsServiceServer + address string + srv *grpc.Server + exports sync.WaitGroup + wg sync.WaitGroup + published chan *ocmetrics.ExportMetricsServiceRequest +} + +func (oc *openCensusFake) start(expectedStreams int) error { + ln, err := net.Listen("tcp", oc.address) + if err != nil { + return err + } + oc.published = make(chan *ocmetrics.ExportMetricsServiceRequest, 100) + oc.srv = grpc.NewServer() + ocmetrics.RegisterMetricsServiceServer(oc.srv, oc) + // Run the server in the background. + oc.wg.Add(1) + go func() { + oc.srv.Serve(ln) + oc.wg.Done() + oc.wg.Wait() + close(oc.published) + }() + oc.exports.Add(expectedStreams) + go oc.stop() + return nil +} + +func (oc *openCensusFake) stop() { + oc.exports.Wait() + oc.srv.Stop() +} + +func (oc *openCensusFake) Export(stream ocmetrics.MetricsService_ExportServer) error { + var streamResource *ocresource.Resource + oc.wg.Add(1) + defer oc.wg.Done() + metricSeen := false + for { + in, err := stream.Recv() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + if in.Resource != nil { + // The stream is stateful, keep track of the last Resource seen. + streamResource = in.Resource + } + if len(in.Metrics) > 0 { + if in.Resource == nil { + in.Resource = streamResource + } + oc.published <- proto.Clone(in).(*ocmetrics.ExportMetricsServiceRequest) + if !metricSeen { + oc.exports.Done() + metricSeen = true + } + } + } +} + +type stackDriverFake struct { + stackdriverpb.UnimplementedMetricServiceServer + address string + srv *grpc.Server + t *testing.T + published chan *stackdriverpb.CreateTimeSeriesRequest +} + +func (sd *stackDriverFake) start() error { + sd.published = make(chan *stackdriverpb.CreateTimeSeriesRequest, 100) + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + return err + } + sd.address = ln.Addr().String() + sd.srv = grpc.NewServer() + stackdriverpb.RegisterMetricServiceServer(sd.srv, sd) + // Run the server in the background. + go func() { + sd.srv.Serve(ln) + close(sd.published) + }() + return nil +} + +func (sd *stackDriverFake) CreateTimeSeries(ctx context.Context, req *stackdriverpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) { + sd.published <- req + return &emptypb.Empty{}, nil +} + +func (sd *stackDriverFake) CreateMetricDescriptor(ctx context.Context, req *stackdriverpb.CreateMetricDescriptorRequest) (*metricpb.MetricDescriptor, error) { + return req.MetricDescriptor, nil +} diff --git a/metrics/exporter.go b/metrics/exporter.go index 0c680106ee..b98a18de1f 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -23,6 +23,7 @@ import ( "strings" "sync" + "go.opencensus.io/resource" "go.opencensus.io/stats/view" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -161,13 +162,13 @@ func UpdateExporter(ctx context.Context, ops ExporterOptions, logger *zap.Sugare flushGivenExporter(curMetricsExporter) e, f, err := newMetricsExporter(newConfig, logger) if err != nil { - logger.Errorw("Failed to update a new metrics exporter based on metric config", newConfig, zap.Error(err)) + logger.Errorw("Failed to update a new metrics exporter based on metric config", zap.Error(err), "config", newConfig) return err } existingConfig := curMetricsConfig curMetricsExporter = e if err := setFactory(f); err != nil { - logger.Errorw("Failed to update metrics factory when loading metric config", newConfig, zap.Error(err)) + logger.Errorw("Failed to update metrics factory when loading metric config", zap.Error(err), "config", newConfig) return err } logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, newConfig) @@ -212,7 +213,10 @@ func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view. openCensus: newOpenCensusExporter, prometheus: newPrometheusExporter, none: func(*metricsConfig, *zap.SugaredLogger) (view.Exporter, ResourceExporterFactory, error) { - return nil, nil, nil + noneFactory := func(*resource.Resource) (view.Exporter, error) { + return &noneExporter{}, nil + } + return &noneExporter{}, noneFactory, nil }, } @@ -272,3 +276,10 @@ func flushGivenExporter(e view.Exporter) bool { } return false } + +type noneExporter struct { +} + +// NoneExporter implements view.Exporter in the nil case. +func (*noneExporter) ExportView(*view.Data) { +} diff --git a/metrics/exporter_test.go b/metrics/exporter_test.go index 9d692bcbcb..871c8eb4c4 100644 --- a/metrics/exporter_test.go +++ b/metrics/exporter_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "golang.org/x/net/context" . "knative.dev/pkg/logging/testing" ) @@ -248,7 +249,14 @@ func TestInterleavedExporters(t *testing.T) { func TestFlushExporter(t *testing.T) { // No exporter - no action should be taken - setCurMetricsConfig(nil) + UpdateExporter(context.Background(), ExporterOptions{ + Domain: "test", + Component: "test", + ConfigMap: map[string]string{ + BackendDestinationKey: string(none), + }, + }, TestLogger(t)) + if want, got := false, FlushExporter(); got != want { t.Errorf("Expected %v, got %v.", want, got) } diff --git a/metrics/resource_view_test.go b/metrics/resource_view_test.go index b5ae5f0935..8814447aa8 100644 --- a/metrics/resource_view_test.go +++ b/metrics/resource_view_test.go @@ -17,49 +17,18 @@ limitations under the License. package metrics import ( - "context" - "errors" "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "os" - "sort" - "strings" - "sync" "testing" "time" - sd "contrib.go.opencensus.io/exporter/stackdriver" - ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" - ocresource "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "go.opencensus.io/resource" "go.opencensus.io/stats" "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/wait" - - emptypb "github.com/golang/protobuf/ptypes/empty" - "github.com/google/go-cmp/cmp" - "google.golang.org/api/option" - metricpb "google.golang.org/genproto/googleapis/api/metric" - stackdriverpb "google.golang.org/genproto/googleapis/monitoring/v3" - "google.golang.org/grpc" - proto "google.golang.org/protobuf/proto" - - logtesting "knative.dev/pkg/logging/testing" - "knative.dev/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricstest" ) var ( - r = resource.Resource{Labels: map[string]string{"foo": "bar"}} - NamespaceTagKey = tag.MustNewKey(metricskey.LabelNamespaceName) - ServiceTagKey = tag.MustNewKey(metricskey.LabelServiceName) - ConfigTagKey = tag.MustNewKey(metricskey.LabelConfigurationName) - RevisionTagKey = tag.MustNewKey(metricskey.LabelRevisionName) + r = resource.Resource{Labels: map[string]string{"foo": "bar"}} ) func TestRegisterResourceView(t *testing.T) { @@ -106,6 +75,14 @@ type testExporter struct { } func TestSetFactory(t *testing.T) { + var oldFactory ResourceExporterFactory + func() { + allMeters.lock.Lock() + defer allMeters.lock.Unlock() + + oldFactory = allMeters.factory + }() + fakeFactory := func(rr *resource.Resource) (view.Exporter, error) { if rr == nil { return &testExporter{}, nil @@ -144,6 +121,8 @@ func TestSetFactory(t *testing.T) { if e.id != "456" { t.Error("Expect id to be 456, instead got", e.id) } + + setFactory(oldFactory) } func TestAllMetersExpiration(t *testing.T) { @@ -243,572 +222,3 @@ func BenchmarkResourceToKey(b *testing.B) { }) } } - -type metricExtract struct { - Name string - Labels map[string]string - Value int64 -} - -func (m metricExtract) Key() string { - return fmt.Sprintf("%s<%s>", m.Name, resource.EncodeLabels(m.Labels)) -} - -func (m metricExtract) String() string { - return fmt.Sprintf("%s:%d", m.Key(), m.Value) -} - -func initSdFake(sdFake *stackDriverFake) error { - if err := sdFake.start(); err != nil { - return err - } - conn, err := grpc.Dial(sdFake.address, grpc.WithInsecure()) - if err != nil { - return err - } - newStackdriverExporterFunc = func(o sd.Options) (view.Exporter, error) { - o.MonitoringClientOptions = append(o.MonitoringClientOptions, option.WithGRPCConn(conn)) - return newOpencensusSDExporter(o) - } - // File: must exist, be json of credentialsFile, and type must be a jwtConfig or oauth2Config - tmp, err := ioutil.TempFile("", "metrics-sd-test") - if err != nil { - return err - } - defer tmp.Close() - credentialsContent := []byte(`{"type": "service_account"}`) - if _, err := tmp.Write(credentialsContent); err != nil { - return err - } - os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", tmp.Name()) - return nil -} - -func sortMetrics() cmp.Option { - return cmp.Transformer("Sort", func(in []metricExtract) []string { - out := make([]string, 0, len(in)) - seen := map[string]int{} - for _, m := range in { - // Keep only the newest report for a key - key := m.Key() - if seen[key] == 0 { - out = append(out, m.String()) - seen[key] = len(out) // Store address+1 to avoid doubling first item. - } else { - out[seen[key]-1] = m.String() - } - } - sort.Strings(out) - return out - }) -} - -// Begin table tests for exporters -func TestMetricsExport(t *testing.T) { - TestOverrideBundleCount = 1 - t.Cleanup(func() { TestOverrideBundleCount = 0 }) - ocFake := openCensusFake{address: "localhost:12345"} - sdFake := stackDriverFake{} - prometheusPort := 19090 - configForBackend := func(backend metricsBackend) ExporterOptions { - return ExporterOptions{ - Domain: servingDomain, - Component: testComponent, - PrometheusPort: prometheusPort, - ConfigMap: map[string]string{ - BackendDestinationKey: string(backend), - collectorAddressKey: ocFake.address, - allowStackdriverCustomMetricsKey: "true", - stackdriverCustomMetricSubDomainKey: servingDomain, - reportingPeriodKey: "1", - }, - } - } - - resources := []*resource.Resource{ - { - Type: "revision", - Labels: map[string]string{ - "project": "p1", - "revision": "r1", - }, - }, - { - Type: "revision", - Labels: map[string]string{ - "project": "p1", - "revision": "r2", - }, - }, - } - gauge := stats.Int64("testing/value", "Stored value", stats.UnitDimensionless) - counter := stats.Int64("export counts", "Times through the export", stats.UnitDimensionless) - gaugeView := &view.View{ - Name: "testing/value", - Description: "Test value", - Measure: gauge, - Aggregation: view.LastValue(), - } - resourceCounter := &view.View{ - Name: "resource_global_export_count", - Description: "Count of exports via RegisterResourceView.", - Measure: counter, - Aggregation: view.Count(), - } - globalCounter := &view.View{ - Name: "global_export_counts", - Description: "Count of exports via standard OpenCensus view.", - Measure: counter, - Aggregation: view.Count(), - } - - expected := []metricExtract{ - {"knative.dev/serving/testComponent/global_export_counts", map[string]string{}, 2}, - {"knative.dev/serving/testComponent/resource_global_export_count", map[string]string{}, 2}, - {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r1"}, 0}, - {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r2"}, 1}, - } - - harnesses := []struct { - name string - init func() error - validate func(t *testing.T) - }{{ - name: "Prometheus", - init: func() error { - if err := UpdateExporter(context.Background(), configForBackend(prometheus), logtesting.TestLogger(t)); err != nil { - return err - } - // Wait for the webserver to actually start serving metrics - return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) - return err == nil && resp.StatusCode == http.StatusOK, nil - }) - }, - validate: func(t *testing.T) { - metricstest.EnsureRecorded() - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) - if err != nil { - t.Fatalf("failed to fetch prometheus metrics: %+v", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Fatalf("failed to read prometheus response: %+v", err) - } - want := `# HELP testComponent_global_export_counts Count of exports via standard OpenCensus view. -# TYPE testComponent_global_export_counts counter -testComponent_global_export_counts 2 -# HELP testComponent_resource_global_export_count Count of exports via RegisterResourceView. -# TYPE testComponent_resource_global_export_count counter -testComponent_resource_global_export_count 2 -# HELP testComponent_testing_value Test value -# TYPE testComponent_testing_value gauge -testComponent_testing_value{project="p1",revision="r1"} 0 -testComponent_testing_value{project="p1",revision="r2"} 1 -` - if diff := cmp.Diff(want, string(body)); diff != "" { - t.Errorf("Unexpected prometheus output (-want +got):\n%s", diff) - } - }, - }, { - name: "OpenCensus", - init: func() error { - if err := ocFake.start(len(resources) + 1); err != nil { - return err - } - t.Log("Created exporter at", ocFake.address) - return UpdateExporter(context.Background(), configForBackend(openCensus), logtesting.TestLogger(t)) - }, - validate: func(t *testing.T) { - t.Skip("Skipped because of excessive flakiness, see: https://github.com/knative/pkg/issues/1672") - - // We unregister the views because this is one of two ways to flush - // the internal aggregation buffers; the other is to have the - // internal reporting period duration tick, which is at least - // [new duration] in the future. - view.Unregister(globalCounter) - UnregisterResourceView(gaugeView, resourceCounter) - - records := []metricExtract{} - loop: - for { - select { - case record := <-ocFake.published: - if record == nil { - continue loop - } - for _, m := range record.Metrics { - if len(m.Timeseries) > 0 { - labels := map[string]string{} - if record.Resource != nil { - labels = record.Resource.Labels - } - records = append(records, metricExtract{ - Name: m.MetricDescriptor.Name, - Labels: labels, - Value: m.Timeseries[0].Points[0].GetInt64Value(), - }) - } - } - if len(records) >= len(expected) { - break loop - } - case <-time.After(4 * time.Second): - t.Error("Timeout reading input") - break loop - } - } - - if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { - t.Errorf("Unexpected OpenCensus exports (-want +got):\n%s", diff) - } - }, - }, { - name: "Stackdriver", - init: func() error { - if err := initSdFake(&sdFake); err != nil { - return err - } - return UpdateExporter(context.Background(), configForBackend(stackdriver), logtesting.TestLogger(t)) - }, - validate: func(t *testing.T) { - records := []metricExtract{} - for record := range sdFake.published { - for _, ts := range record.TimeSeries { - name := ts.Metric.Type[len("custom.googleapis.com/"):] - records = append(records, metricExtract{ - Name: name, - Labels: ts.Resource.Labels, - Value: ts.Points[0].Value.GetInt64Value(), - }) - } - if len(records) >= 4 { - // There's no way to synchronize on the internal timer used - // by metricsexport.IntervalReader, so shut down the - // exporter after the first report cycle. - FlushExporter() - sdFake.srv.GracefulStop() - } - } - if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { - t.Errorf("Unexpected Stackdriver exports (-want +got):\n%s", diff) - } - }, - }} - - for _, c := range harnesses { - t.Run(c.name, func(t *testing.T) { - ClearMetersForTest() - sdFake.t = t - if err := c.init(); err != nil { - t.Fatalf("unable to init: %+v", err) - } - - view.Register(globalCounter) - if err := RegisterResourceView(gaugeView, resourceCounter); err != nil { - t.Fatal("Unable to register views:", err) - } - t.Cleanup(func() { - view.Unregister(globalCounter) - UnregisterResourceView(gaugeView, resourceCounter) - }) - - for i, r := range resources { - ctx := context.Background() - Record(ctx, counter.M(int64(1))) - if r != nil { - ctx = metricskey.WithResource(ctx, *r) - } - Record(ctx, gauge.M(int64(i))) - } - c.validate(t) - }) - } -} - -func TestStackDriverExports(t *testing.T) { - TestOverrideBundleCount = 1 - t.Cleanup(func() { TestOverrideBundleCount = 0 }) - eo := ExporterOptions{ - Domain: servingDomain, - Component: "autoscaler", - ConfigMap: map[string]string{ - BackendDestinationKey: string(stackdriver), - reportingPeriodKey: "1", - stackdriverProjectIDKey: "foobar", - }, - } - - label1 := map[string]string{ - "cluster_name": "test-cluster", - "configuration_name": "config", - "location": "test-location", - "namespace_name": "ns", - "project_id": "foobar", - "revision_name": "revision", - "service_name": "service", - } - label2 := map[string]string{ - "cluster_name": "test-cluster", - "configuration_name": "config2", - "location": "test-location", - "namespace_name": "ns2", - "project_id": "foobar", - "revision_name": "revision2", - "service_name": "service2", - } - batchLabels := map[string]string{ - "namespace_name": "ns2", - "configuration_name": "config2", - "revision_name": "revision2", - "service_name": "service2", - } - harness := []struct { - name string - allowCustomMetrics string - expected []metricExtract - }{{ - name: "Allow custom metrics", - allowCustomMetrics: "true", - expected: []metricExtract{ - { - "knative.dev/serving/autoscaler/actual_pods", - label1, - 1, - }, - { - "knative.dev/serving/autoscaler/desired_pods", - label2, - 2, - }, - { - "custom.googleapis.com/knative.dev/autoscaler/not_ready_pods", - batchLabels, - 3, - }, - }, - }, { - name: "Don't allow custom metrics", - allowCustomMetrics: "false", - expected: []metricExtract{ - { - "knative.dev/serving/autoscaler/actual_pods", - label1, - 1, - }, - { - "knative.dev/serving/autoscaler/desired_pods", - label2, - 2, - }, - }, - }} - - for _, tc := range harness { - t.Run(tc.name, func(t *testing.T) { - eo.ConfigMap[allowStackdriverCustomMetricsKey] = tc.allowCustomMetrics - // Change the cluster name to reinitialize the exporter and pick up a new port. - eo.ConfigMap[stackdriverClusterNameKey] = tc.name - actualPodCountM := stats.Int64( - "actual_pods", - "Number of pods that are allocated currently", - stats.UnitDimensionless) - actualPodsCountView := &view.View{ - Description: "Number of pods that are allocated currently", - Measure: actualPodCountM, - Aggregation: view.LastValue(), - TagKeys: []tag.Key{NamespaceTagKey, ServiceTagKey, ConfigTagKey, RevisionTagKey}, - } - desiredPodCountM := stats.Int64( - "desired_pods", - "Number of pods that are desired", - stats.UnitDimensionless) - desiredPodsCountView := &view.View{ - Description: "Number of pods that are desired", - Measure: desiredPodCountM, - Aggregation: view.LastValue(), - } - notReadyPodCountM := stats.Int64( - "not_ready_pods", - "Number of pods that are not ready", - stats.UnitDimensionless) - customView := &view.View{ - Description: "non-knative-revision metric per KnativeRevisionMetrics", - Measure: notReadyPodCountM, - Aggregation: view.LastValue(), - } - - sdFake := stackDriverFake{t: t} - if err := initSdFake(&sdFake); err != nil { - t.Error("Init stackdriver failed", err) - } - if err := UpdateExporter(context.Background(), eo, logtesting.TestLogger(t)); err != nil { - t.Error("UpdateExporter failed", err) - } - - if err := RegisterResourceView(desiredPodsCountView, actualPodsCountView, customView); err != nil { - t.Fatalf("unable to register view: %+v", err) - } - t.Cleanup(func() { - UnregisterResourceView(desiredPodsCountView, actualPodsCountView, customView) - }) - - ctx, err := tag.New(context.Background(), tag.Upsert(NamespaceTagKey, "ns"), - tag.Upsert(ServiceTagKey, "service"), - tag.Upsert(ConfigTagKey, "config"), - tag.Upsert(RevisionTagKey, "revision")) - if err != nil { - t.Fatal("Unable to create tags", err) - } - Record(ctx, actualPodCountM.M(int64(1))) - - r := resource.Resource{ - Type: "testing", - Labels: batchLabels, - } - RecordBatch( - metricskey.WithResource(context.Background(), r), - desiredPodCountM.M(int64(2)), - notReadyPodCountM.M(int64(3))) - - records := []metricExtract{} - loop: - for { - select { - case record := <-sdFake.published: - for _, ts := range record.TimeSeries { - extracted := metricExtract{ - Name: ts.Metric.Type, - Labels: ts.Resource.Labels, - Value: ts.Points[0].Value.GetInt64Value(), - } - // Override 'cluster-name' label to reset to a fixed value - if extracted.Labels["cluster_name"] != "" { - extracted.Labels["cluster_name"] = "test-cluster" - } - records = append(records, extracted) - if strings.HasPrefix(ts.Metric.Type, "knative.dev/") { - if diff := cmp.Diff(ts.Resource.Type, metricskey.ResourceTypeKnativeRevision); diff != "" { - t.Errorf("Incorrect resource type for %q: (-want +got):\n%s", ts.Metric.Type, diff) - } - } - } - if len(records) >= len(tc.expected) { - // There's no way to synchronize on the internal timer used - // by metricsexport.IntervalReader, so shut down the - // exporter after the first report cycle. - FlushExporter() - sdFake.srv.GracefulStop() - break loop - } - case <-time.After(4 * time.Second): - t.Error("Timeout reading records from Stackdriver") - break loop - } - } - if diff := cmp.Diff(tc.expected, records, sortMetrics()); diff != "" { - t.Errorf("Unexpected stackdriver knative exports (-want +got):\n%s", diff) - } - }) - } -} - -type openCensusFake struct { - ocmetrics.UnimplementedMetricsServiceServer - address string - srv *grpc.Server - exports sync.WaitGroup - wg sync.WaitGroup - published chan *ocmetrics.ExportMetricsServiceRequest -} - -func (oc *openCensusFake) start(expectedStreams int) error { - ln, err := net.Listen("tcp", oc.address) - if err != nil { - return err - } - oc.published = make(chan *ocmetrics.ExportMetricsServiceRequest, 100) - oc.srv = grpc.NewServer() - ocmetrics.RegisterMetricsServiceServer(oc.srv, oc) - // Run the server in the background. - oc.wg.Add(1) - go func() { - oc.srv.Serve(ln) - oc.wg.Done() - oc.wg.Wait() - close(oc.published) - }() - oc.exports.Add(expectedStreams) - go oc.stop() - return nil -} - -func (oc *openCensusFake) stop() { - oc.exports.Wait() - oc.srv.Stop() -} - -func (oc *openCensusFake) Export(stream ocmetrics.MetricsService_ExportServer) error { - var streamResource *ocresource.Resource - oc.wg.Add(1) - defer oc.wg.Done() - metricSeen := false - for { - in, err := stream.Recv() - if errors.Is(err, io.EOF) { - return nil - } - if err != nil { - return err - } - if in.Resource != nil { - // The stream is stateful, keep track of the last Resource seen. - streamResource = in.Resource - } - if len(in.Metrics) > 0 { - if in.Resource == nil { - in.Resource = streamResource - } - oc.published <- proto.Clone(in).(*ocmetrics.ExportMetricsServiceRequest) - if !metricSeen { - oc.exports.Done() - metricSeen = true - } - } - } -} - -type stackDriverFake struct { - stackdriverpb.UnimplementedMetricServiceServer - address string - srv *grpc.Server - t *testing.T - published chan *stackdriverpb.CreateTimeSeriesRequest -} - -func (sd *stackDriverFake) start() error { - sd.published = make(chan *stackdriverpb.CreateTimeSeriesRequest, 100) - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - return err - } - sd.address = ln.Addr().String() - sd.srv = grpc.NewServer() - stackdriverpb.RegisterMetricServiceServer(sd.srv, sd) - // Run the server in the background. - go func() { - sd.srv.Serve(ln) - close(sd.published) - }() - return nil -} - -func (sd *stackDriverFake) CreateTimeSeries(ctx context.Context, req *stackdriverpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) { - sd.published <- req - return &emptypb.Empty{}, nil -} - -func (sd *stackDriverFake) CreateMetricDescriptor(ctx context.Context, req *stackdriverpb.CreateMetricDescriptorRequest) (*metricpb.MetricDescriptor, error) { - return req.MetricDescriptor, nil -}