From 14a8dae026a2360a729c9f022274347d16436fdb Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Thu, 10 Dec 2020 19:50:25 -0800 Subject: [PATCH 1/6] Copy resource_view_test to e2e_test --- metrics/{resource_view_test.go => e2e_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename metrics/{resource_view_test.go => e2e_test.go} (100%) diff --git a/metrics/resource_view_test.go b/metrics/e2e_test.go similarity index 100% rename from metrics/resource_view_test.go rename to metrics/e2e_test.go From 2f6ecdb57f2c8be8c1d352286d92db9037180069 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Thu, 10 Dec 2020 19:50:56 -0800 Subject: [PATCH 2/6] Restore metrics/resource_view_test.go --- metrics/resource_view_test.go | 798 ++++++++++++++++++++++++++++++++++ 1 file changed, 798 insertions(+) create mode 100644 metrics/resource_view_test.go diff --git a/metrics/resource_view_test.go b/metrics/resource_view_test.go new file mode 100644 index 0000000000..0cc5dbced1 --- /dev/null +++ b/metrics/resource_view_test.go @@ -0,0 +1,798 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "os" + "sort" + "strings" + "sync" + "testing" + "time" + + sd "contrib.go.opencensus.io/exporter/stackdriver" + ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" + ocresource "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + "go.opencensus.io/resource" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" + + emptypb "github.com/golang/protobuf/ptypes/empty" + "github.com/google/go-cmp/cmp" + "google.golang.org/api/option" + metricpb "google.golang.org/genproto/googleapis/api/metric" + stackdriverpb "google.golang.org/genproto/googleapis/monitoring/v3" + "google.golang.org/grpc" + proto "google.golang.org/protobuf/proto" + + logtesting "knative.dev/pkg/logging/testing" + "knative.dev/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" +) + +var ( + r = resource.Resource{Labels: map[string]string{"foo": "bar"}} + NamespaceTagKey = tag.MustNewKey(metricskey.LabelNamespaceName) + ServiceTagKey = tag.MustNewKey(metricskey.LabelServiceName) + ConfigTagKey = tag.MustNewKey(metricskey.LabelConfigurationName) + RevisionTagKey = tag.MustNewKey(metricskey.LabelRevisionName) +) + +func TestRegisterResourceView(t *testing.T) { + meter := meterExporterForResource(&r).m + + m := stats.Int64("testView_sum", "", stats.UnitDimensionless) + view := view.View{Name: "testView", Measure: m, Aggregation: view.Sum()} + + err := RegisterResourceView(&view) + if err != nil { + t.Fatal("RegisterResourceView =", err) + } + t.Cleanup(func() { UnregisterResourceView(&view) }) + + viewToFind := defaultMeter.m.Find("testView") + if viewToFind == nil || viewToFind.Name != "testView" { + t.Error("Registered view should be found in default meter, instead got", viewToFind) + } + + viewToFind = meter.Find("testView") + if viewToFind == nil || viewToFind.Name != "testView" { + t.Error("Registered view should be found in new meter, instead got", viewToFind) + } +} + +func TestOptionForResource(t *testing.T) { + option, err1 := optionForResource(&r) + if err1 != nil { + t.Error("Should succeed getting option, instead got error", err1) + } + optionAgain, err2 := optionForResource(&r) + if err2 != nil { + t.Error("Should succeed getting option, instead got error", err2) + } + + if fmt.Sprintf("%v", optionAgain) != fmt.Sprintf("%v", option) { + t.Errorf("Option for the same resource should not be recreated, instead got %v and %v", optionAgain, option) + } +} + +type testExporter struct { + view.Exporter + id string +} + +func TestSetFactory(t *testing.T) { + fakeFactory := func(rr *resource.Resource) (view.Exporter, error) { + if rr == nil { + return &testExporter{}, nil + } + + return &testExporter{id: rr.Labels["id"]}, nil + } + + resource123 := r + resource123.Labels["id"] = "123" + + setFactory(fakeFactory) + // Create the new meter and apply the factory + _, err := optionForResource(&resource123) + if err != nil { + t.Error("Should succeed getting option, instead got error", err) + } + + // Now get the exporter and verify the id + me := meterExporterForResource(&resource123) + e := me.e.(*testExporter) + if e.id != "123" { + t.Error("Expect id to be 123, instead got", e.id) + } + + resource456 := r + resource456.Labels["id"] = "456" + // Create the new meter and apply the factory + _, err = optionForResource(&resource456) + if err != nil { + t.Error("Should succeed getting option, instead got error", err) + } + + me = meterExporterForResource(&resource456) + e = me.e.(*testExporter) + if e.id != "456" { + t.Error("Expect id to be 456, instead got", e.id) + } +} + +func TestAllMetersExpiration(t *testing.T) { + allMeters.clock = clock.Clock(clock.NewFakeClock(time.Now())) + var fakeClock *clock.FakeClock = allMeters.clock.(*clock.FakeClock) + ClearMetersForTest() // t+0m + + // Add resource123 + resource123 := r + resource123.Labels["id"] = "123" + _, err := optionForResource(&resource123) + if err != nil { + t.Error("Should succeed getting option, instead got error ", err) + } + // (123=0m, 456=Inf) + + // Bump time to make resource123's expiry offset from resource456 + fakeClock.Step(90 * time.Second) // t+1.5m + // (123=0m, 456=Inf) + + // Add 456 + resource456 := r + resource456.Labels["id"] = "456" + _, err = optionForResource(&resource456) + if err != nil { + t.Error("Should succeed getting option, instead got error ", err) + } + allMeters.lock.Lock() + if len(allMeters.meters) != 3 { + t.Errorf("len(allMeters)=%d, want: 3", len(allMeters.meters)) + } + allMeters.lock.Unlock() + // (123=1.5m, 456=0m) + + // Warm up the older entry + fakeClock.Step(90 * time.Second) //t+3m + // (123=4.5m, 456=3m) + + // Refresh the first entry + _, err = optionForResource(&resource123) + if err != nil { + t.Error("Should succeed getting option, instead got error ", err) + } + // (123=0, 456=1.5m) + + // Expire the second entry + fakeClock.Step(9 * time.Minute) // t+12m + time.Sleep(time.Second) // Wait a second on the wallclock, so that the cleanup thread has time to finish a loop + allMeters.lock.Lock() + if len(allMeters.meters) != 2 { + t.Errorf("len(allMeters)=%d, want: 2", len(allMeters.meters)) + } + allMeters.lock.Unlock() + // (123=9m, 456=10.5m) + // non-expiring defaultMeter was just tested + + // Add resource789 + resource789 := r + resource789.Labels["id"] = "789" + _, err = optionForResource(&resource789) + if err != nil { + t.Error("Should succeed getting option, instead got error ", err) + } + // (123=9m, 456=evicted, 789=0m) +} + +func TestResourceAsString(t *testing.T) { + r1 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k1": "v1", "k3": "v3", "k2": "v2"}} + r2 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k2": "v2", "k3": "v3", "k1": "v1"}} + r3 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k1": "v1", "k2": "v2", "k4": "v4"}} + + // Test 5 time since the iteration could be random. + for i := 0; i < 5; i++ { + + if s1, s2 := resourceToKey(r1), resourceToKey(r2); s1 != s2 { + t.Errorf("Expect same resources, but got %q and %q", s1, s2) + } + } + + if s1, s3 := resourceToKey(r1), resourceToKey(r3); s1 == s3 { + t.Error("Expect different resources, but got the same", s1) + } +} + +type metricExtract struct { + Name string + Labels map[string]string + Value int64 +} + +func (m metricExtract) Key() string { + return fmt.Sprintf("%s<%s>", m.Name, resource.EncodeLabels(m.Labels)) +} + +func (m metricExtract) String() string { + return fmt.Sprintf("%s:%d", m.Key(), m.Value) +} + +func initSdFake(sdFake *stackDriverFake) error { + if err := sdFake.start(); err != nil { + return err + } + conn, err := grpc.Dial(sdFake.address, grpc.WithInsecure()) + if err != nil { + return err + } + newStackdriverExporterFunc = func(o sd.Options) (view.Exporter, error) { + o.MonitoringClientOptions = append(o.MonitoringClientOptions, option.WithGRPCConn(conn)) + return newOpencensusSDExporter(o) + } + // File: must exist, be json of credentialsFile, and type must be a jwtConfig or oauth2Config + tmp, err := ioutil.TempFile("", "metrics-sd-test") + if err != nil { + return err + } + defer tmp.Close() + credentialsContent := []byte(`{"type": "service_account"}`) + if _, err := tmp.Write(credentialsContent); err != nil { + return err + } + os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", tmp.Name()) + return nil +} + +func sortMetrics() cmp.Option { + return cmp.Transformer("Sort", func(in []metricExtract) []string { + out := make([]string, 0, len(in)) + seen := map[string]int{} + for _, m := range in { + // Keep only the newest report for a key + key := m.Key() + if seen[key] == 0 { + out = append(out, m.String()) + seen[key] = len(out) // Store address+1 to avoid doubling first item. + } else { + out[seen[key]-1] = m.String() + } + } + sort.Strings(out) + return out + }) +} + +// Begin table tests for exporters +func TestMetricsExport(t *testing.T) { + TestOverrideBundleCount = 1 + t.Cleanup(func() { TestOverrideBundleCount = 0 }) + ocFake := openCensusFake{address: "localhost:12345"} + sdFake := stackDriverFake{} + prometheusPort := 19090 + configForBackend := func(backend metricsBackend) ExporterOptions { + return ExporterOptions{ + Domain: servingDomain, + Component: testComponent, + PrometheusPort: prometheusPort, + ConfigMap: map[string]string{ + BackendDestinationKey: string(backend), + collectorAddressKey: ocFake.address, + allowStackdriverCustomMetricsKey: "true", + stackdriverCustomMetricSubDomainKey: servingDomain, + reportingPeriodKey: "1", + }, + } + } + + resources := []*resource.Resource{ + { + Type: "revision", + Labels: map[string]string{ + "project": "p1", + "revision": "r1", + }, + }, + { + Type: "revision", + Labels: map[string]string{ + "project": "p1", + "revision": "r2", + }, + }, + } + gauge := stats.Int64("testing/value", "Stored value", stats.UnitDimensionless) + counter := stats.Int64("export counts", "Times through the export", stats.UnitDimensionless) + gaugeView := &view.View{ + Name: "testing/value", + Description: "Test value", + Measure: gauge, + Aggregation: view.LastValue(), + } + resourceCounter := &view.View{ + Name: "resource_global_export_count", + Description: "Count of exports via RegisterResourceView.", + Measure: counter, + Aggregation: view.Count(), + } + globalCounter := &view.View{ + Name: "global_export_counts", + Description: "Count of exports via standard OpenCensus view.", + Measure: counter, + Aggregation: view.Count(), + } + + expected := []metricExtract{ + {"knative.dev/serving/testComponent/global_export_counts", map[string]string{}, 2}, + {"knative.dev/serving/testComponent/resource_global_export_count", map[string]string{}, 2}, + {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r1"}, 0}, + {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r2"}, 1}, + } + + harnesses := []struct { + name string + init func() error + validate func(t *testing.T) + }{{ + name: "Prometheus", + init: func() error { + if err := UpdateExporter(context.Background(), configForBackend(prometheus), logtesting.TestLogger(t)); err != nil { + return err + } + // Wait for the webserver to actually start serving metrics + return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) + return err == nil && resp.StatusCode == http.StatusOK, nil + }) + }, + validate: func(t *testing.T) { + metricstest.EnsureRecorded() + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) + if err != nil { + t.Fatalf("failed to fetch prometheus metrics: %+v", err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("failed to read prometheus response: %+v", err) + } + want := `# HELP testComponent_global_export_counts Count of exports via standard OpenCensus view. +# TYPE testComponent_global_export_counts counter +testComponent_global_export_counts 2 +# HELP testComponent_resource_global_export_count Count of exports via RegisterResourceView. +# TYPE testComponent_resource_global_export_count counter +testComponent_resource_global_export_count 2 +# HELP testComponent_testing_value Test value +# TYPE testComponent_testing_value gauge +testComponent_testing_value{project="p1",revision="r1"} 0 +testComponent_testing_value{project="p1",revision="r2"} 1 +` + if diff := cmp.Diff(want, string(body)); diff != "" { + t.Errorf("Unexpected prometheus output (-want +got):\n%s", diff) + } + }, + }, { + name: "OpenCensus", + init: func() error { + if err := ocFake.start(len(resources) + 1); err != nil { + return err + } + t.Log("Created exporter at", ocFake.address) + return UpdateExporter(context.Background(), configForBackend(openCensus), logtesting.TestLogger(t)) + }, + validate: func(t *testing.T) { + t.Skip("Skipped because of excessive flakiness, see: https://github.com/knative/pkg/issues/1672") + + // We unregister the views because this is one of two ways to flush + // the internal aggregation buffers; the other is to have the + // internal reporting period duration tick, which is at least + // [new duration] in the future. + view.Unregister(globalCounter) + UnregisterResourceView(gaugeView, resourceCounter) + + records := []metricExtract{} + loop: + for { + select { + case record := <-ocFake.published: + if record == nil { + continue loop + } + for _, m := range record.Metrics { + if len(m.Timeseries) > 0 { + labels := map[string]string{} + if record.Resource != nil { + labels = record.Resource.Labels + } + records = append(records, metricExtract{ + Name: m.MetricDescriptor.Name, + Labels: labels, + Value: m.Timeseries[0].Points[0].GetInt64Value(), + }) + } + } + if len(records) >= len(expected) { + break loop + } + case <-time.After(4 * time.Second): + t.Error("Timeout reading input") + break loop + } + } + + if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { + t.Errorf("Unexpected OpenCensus exports (-want +got):\n%s", diff) + } + }, + }, { + name: "Stackdriver", + init: func() error { + if err := initSdFake(&sdFake); err != nil { + return err + } + return UpdateExporter(context.Background(), configForBackend(stackdriver), logtesting.TestLogger(t)) + }, + validate: func(t *testing.T) { + records := []metricExtract{} + for record := range sdFake.published { + for _, ts := range record.TimeSeries { + name := ts.Metric.Type[len("custom.googleapis.com/"):] + records = append(records, metricExtract{ + Name: name, + Labels: ts.Resource.Labels, + Value: ts.Points[0].Value.GetInt64Value(), + }) + } + if len(records) >= 4 { + // There's no way to synchronize on the internal timer used + // by metricsexport.IntervalReader, so shut down the + // exporter after the first report cycle. + FlushExporter() + sdFake.srv.GracefulStop() + } + } + if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { + t.Errorf("Unexpected Stackdriver exports (-want +got):\n%s", diff) + } + }, + }} + + for _, c := range harnesses { + t.Run(c.name, func(t *testing.T) { + ClearMetersForTest() + sdFake.t = t + if err := c.init(); err != nil { + t.Fatalf("unable to init: %+v", err) + } + + view.Register(globalCounter) + if err := RegisterResourceView(gaugeView, resourceCounter); err != nil { + t.Fatal("Unable to register views:", err) + } + t.Cleanup(func() { + view.Unregister(globalCounter) + UnregisterResourceView(gaugeView, resourceCounter) + }) + + for i, r := range resources { + ctx := context.Background() + Record(ctx, counter.M(int64(1))) + if r != nil { + ctx = metricskey.WithResource(ctx, *r) + } + Record(ctx, gauge.M(int64(i))) + } + c.validate(t) + }) + } +} + +func TestStackDriverExports(t *testing.T) { + TestOverrideBundleCount = 1 + t.Cleanup(func() { TestOverrideBundleCount = 0 }) + eo := ExporterOptions{ + Domain: servingDomain, + Component: "autoscaler", + ConfigMap: map[string]string{ + BackendDestinationKey: string(stackdriver), + reportingPeriodKey: "1", + stackdriverProjectIDKey: "foobar", + }, + } + + label1 := map[string]string{ + "cluster_name": "test-cluster", + "configuration_name": "config", + "location": "test-location", + "namespace_name": "ns", + "project_id": "foobar", + "revision_name": "revision", + "service_name": "service", + } + label2 := map[string]string{ + "cluster_name": "test-cluster", + "configuration_name": "config2", + "location": "test-location", + "namespace_name": "ns2", + "project_id": "foobar", + "revision_name": "revision2", + "service_name": "service2", + } + batchLabels := map[string]string{ + "namespace_name": "ns2", + "configuration_name": "config2", + "revision_name": "revision2", + "service_name": "service2", + } + harness := []struct { + name string + allowCustomMetrics string + expected []metricExtract + }{{ + name: "Allow custom metrics", + allowCustomMetrics: "true", + expected: []metricExtract{ + { + "knative.dev/serving/autoscaler/actual_pods", + label1, + 1, + }, + { + "knative.dev/serving/autoscaler/desired_pods", + label2, + 2, + }, + { + "custom.googleapis.com/knative.dev/autoscaler/not_ready_pods", + batchLabels, + 3, + }, + }, + }, { + name: "Don't allow custom metrics", + allowCustomMetrics: "false", + expected: []metricExtract{ + { + "knative.dev/serving/autoscaler/actual_pods", + label1, + 1, + }, + { + "knative.dev/serving/autoscaler/desired_pods", + label2, + 2, + }, + }, + }} + + for _, tc := range harness { + t.Run(tc.name, func(t *testing.T) { + eo.ConfigMap[allowStackdriverCustomMetricsKey] = tc.allowCustomMetrics + // Change the cluster name to reinitialize the exporter and pick up a new port. + eo.ConfigMap[stackdriverClusterNameKey] = tc.name + actualPodCountM := stats.Int64( + "actual_pods", + "Number of pods that are allocated currently", + stats.UnitDimensionless) + actualPodsCountView := &view.View{ + Description: "Number of pods that are allocated currently", + Measure: actualPodCountM, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{NamespaceTagKey, ServiceTagKey, ConfigTagKey, RevisionTagKey}, + } + desiredPodCountM := stats.Int64( + "desired_pods", + "Number of pods that are desired", + stats.UnitDimensionless) + desiredPodsCountView := &view.View{ + Description: "Number of pods that are desired", + Measure: desiredPodCountM, + Aggregation: view.LastValue(), + } + notReadyPodCountM := stats.Int64( + "not_ready_pods", + "Number of pods that are not ready", + stats.UnitDimensionless) + customView := &view.View{ + Description: "non-knative-revision metric per KnativeRevisionMetrics", + Measure: notReadyPodCountM, + Aggregation: view.LastValue(), + } + + sdFake := stackDriverFake{t: t} + if err := initSdFake(&sdFake); err != nil { + t.Error("Init stackdriver failed", err) + } + if err := UpdateExporter(context.Background(), eo, logtesting.TestLogger(t)); err != nil { + t.Error("UpdateExporter failed", err) + } + + if err := RegisterResourceView(desiredPodsCountView, actualPodsCountView, customView); err != nil { + t.Fatalf("unable to register view: %+v", err) + } + t.Cleanup(func() { + UnregisterResourceView(desiredPodsCountView, actualPodsCountView, customView) + }) + + ctx, err := tag.New(context.Background(), tag.Upsert(NamespaceTagKey, "ns"), + tag.Upsert(ServiceTagKey, "service"), + tag.Upsert(ConfigTagKey, "config"), + tag.Upsert(RevisionTagKey, "revision")) + if err != nil { + t.Fatal("Unable to create tags", err) + } + Record(ctx, actualPodCountM.M(int64(1))) + + r := resource.Resource{ + Type: "testing", + Labels: batchLabels, + } + RecordBatch( + metricskey.WithResource(context.Background(), r), + desiredPodCountM.M(int64(2)), + notReadyPodCountM.M(int64(3))) + + records := []metricExtract{} + loop: + for { + select { + case record := <-sdFake.published: + for _, ts := range record.TimeSeries { + extracted := metricExtract{ + Name: ts.Metric.Type, + Labels: ts.Resource.Labels, + Value: ts.Points[0].Value.GetInt64Value(), + } + // Override 'cluster-name' label to reset to a fixed value + if extracted.Labels["cluster_name"] != "" { + extracted.Labels["cluster_name"] = "test-cluster" + } + records = append(records, extracted) + if strings.HasPrefix(ts.Metric.Type, "knative.dev/") { + if diff := cmp.Diff(ts.Resource.Type, metricskey.ResourceTypeKnativeRevision); diff != "" { + t.Errorf("Incorrect resource type for %q: (-want +got):\n%s", ts.Metric.Type, diff) + } + } + } + if len(records) >= len(tc.expected) { + // There's no way to synchronize on the internal timer used + // by metricsexport.IntervalReader, so shut down the + // exporter after the first report cycle. + FlushExporter() + sdFake.srv.GracefulStop() + break loop + } + case <-time.After(4 * time.Second): + t.Error("Timeout reading records from Stackdriver") + break loop + } + } + if diff := cmp.Diff(tc.expected, records, sortMetrics()); diff != "" { + t.Errorf("Unexpected stackdriver knative exports (-want +got):\n%s", diff) + } + }) + } +} + +type openCensusFake struct { + ocmetrics.UnimplementedMetricsServiceServer + address string + srv *grpc.Server + exports sync.WaitGroup + wg sync.WaitGroup + published chan *ocmetrics.ExportMetricsServiceRequest +} + +func (oc *openCensusFake) start(expectedStreams int) error { + ln, err := net.Listen("tcp", oc.address) + if err != nil { + return err + } + oc.published = make(chan *ocmetrics.ExportMetricsServiceRequest, 100) + oc.srv = grpc.NewServer() + ocmetrics.RegisterMetricsServiceServer(oc.srv, oc) + // Run the server in the background. + oc.wg.Add(1) + go func() { + oc.srv.Serve(ln) + oc.wg.Done() + oc.wg.Wait() + close(oc.published) + }() + oc.exports.Add(expectedStreams) + go oc.stop() + return nil +} + +func (oc *openCensusFake) stop() { + oc.exports.Wait() + oc.srv.Stop() +} + +func (oc *openCensusFake) Export(stream ocmetrics.MetricsService_ExportServer) error { + var streamResource *ocresource.Resource + oc.wg.Add(1) + defer oc.wg.Done() + metricSeen := false + for { + in, err := stream.Recv() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + if in.Resource != nil { + // The stream is stateful, keep track of the last Resource seen. + streamResource = in.Resource + } + if len(in.Metrics) > 0 { + if in.Resource == nil { + in.Resource = streamResource + } + oc.published <- proto.Clone(in).(*ocmetrics.ExportMetricsServiceRequest) + if !metricSeen { + oc.exports.Done() + metricSeen = true + } + } + } +} + +type stackDriverFake struct { + stackdriverpb.UnimplementedMetricServiceServer + address string + srv *grpc.Server + t *testing.T + published chan *stackdriverpb.CreateTimeSeriesRequest +} + +func (sd *stackDriverFake) start() error { + sd.published = make(chan *stackdriverpb.CreateTimeSeriesRequest, 100) + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + return err + } + sd.address = ln.Addr().String() + sd.srv = grpc.NewServer() + stackdriverpb.RegisterMetricServiceServer(sd.srv, sd) + // Run the server in the background. + go func() { + sd.srv.Serve(ln) + close(sd.published) + }() + return nil +} + +func (sd *stackDriverFake) CreateTimeSeries(ctx context.Context, req *stackdriverpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) { + sd.published <- req + return &emptypb.Empty{}, nil +} + +func (sd *stackDriverFake) CreateMetricDescriptor(ctx context.Context, req *stackdriverpb.CreateMetricDescriptorRequest) (*metricpb.MetricDescriptor, error) { + return req.MetricDescriptor, nil +} From 3e094f8e3b2f531159852229ede93d8f519319a9 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Thu, 10 Dec 2020 19:52:41 -0800 Subject: [PATCH 3/6] Refactor metrics tests, fix e2e_test --- metrics/config.go | 2 +- metrics/e2e_test.go | 193 +---------- metrics/exporter.go | 19 +- metrics/exporter_test.go | 10 +- metrics/resource_view_test.go | 612 +--------------------------------- 5 files changed, 48 insertions(+), 788 deletions(-) diff --git a/metrics/config.go b/metrics/config.go index d4ae24ad54..535c8e9bce 100644 --- a/metrics/config.go +++ b/metrics/config.go @@ -208,7 +208,7 @@ func createMetricsConfig(ctx context.Context, ops ExporterOptions) (*metricsConf } switch lb := metricsBackend(strings.ToLower(backend)); lb { - case stackdriver, prometheus, openCensus: + case stackdriver, prometheus, openCensus, none: mc.backendDestination = lb default: return nil, fmt.Errorf("unsupported metrics backend value %q", backend) diff --git a/metrics/e2e_test.go b/metrics/e2e_test.go index 0cc5dbced1..2762278bba 100644 --- a/metrics/e2e_test.go +++ b/metrics/e2e_test.go @@ -38,7 +38,6 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" emptypb "github.com/golang/protobuf/ptypes/empty" @@ -55,179 +54,12 @@ import ( ) var ( - r = resource.Resource{Labels: map[string]string{"foo": "bar"}} NamespaceTagKey = tag.MustNewKey(metricskey.LabelNamespaceName) ServiceTagKey = tag.MustNewKey(metricskey.LabelServiceName) ConfigTagKey = tag.MustNewKey(metricskey.LabelConfigurationName) RevisionTagKey = tag.MustNewKey(metricskey.LabelRevisionName) ) -func TestRegisterResourceView(t *testing.T) { - meter := meterExporterForResource(&r).m - - m := stats.Int64("testView_sum", "", stats.UnitDimensionless) - view := view.View{Name: "testView", Measure: m, Aggregation: view.Sum()} - - err := RegisterResourceView(&view) - if err != nil { - t.Fatal("RegisterResourceView =", err) - } - t.Cleanup(func() { UnregisterResourceView(&view) }) - - viewToFind := defaultMeter.m.Find("testView") - if viewToFind == nil || viewToFind.Name != "testView" { - t.Error("Registered view should be found in default meter, instead got", viewToFind) - } - - viewToFind = meter.Find("testView") - if viewToFind == nil || viewToFind.Name != "testView" { - t.Error("Registered view should be found in new meter, instead got", viewToFind) - } -} - -func TestOptionForResource(t *testing.T) { - option, err1 := optionForResource(&r) - if err1 != nil { - t.Error("Should succeed getting option, instead got error", err1) - } - optionAgain, err2 := optionForResource(&r) - if err2 != nil { - t.Error("Should succeed getting option, instead got error", err2) - } - - if fmt.Sprintf("%v", optionAgain) != fmt.Sprintf("%v", option) { - t.Errorf("Option for the same resource should not be recreated, instead got %v and %v", optionAgain, option) - } -} - -type testExporter struct { - view.Exporter - id string -} - -func TestSetFactory(t *testing.T) { - fakeFactory := func(rr *resource.Resource) (view.Exporter, error) { - if rr == nil { - return &testExporter{}, nil - } - - return &testExporter{id: rr.Labels["id"]}, nil - } - - resource123 := r - resource123.Labels["id"] = "123" - - setFactory(fakeFactory) - // Create the new meter and apply the factory - _, err := optionForResource(&resource123) - if err != nil { - t.Error("Should succeed getting option, instead got error", err) - } - - // Now get the exporter and verify the id - me := meterExporterForResource(&resource123) - e := me.e.(*testExporter) - if e.id != "123" { - t.Error("Expect id to be 123, instead got", e.id) - } - - resource456 := r - resource456.Labels["id"] = "456" - // Create the new meter and apply the factory - _, err = optionForResource(&resource456) - if err != nil { - t.Error("Should succeed getting option, instead got error", err) - } - - me = meterExporterForResource(&resource456) - e = me.e.(*testExporter) - if e.id != "456" { - t.Error("Expect id to be 456, instead got", e.id) - } -} - -func TestAllMetersExpiration(t *testing.T) { - allMeters.clock = clock.Clock(clock.NewFakeClock(time.Now())) - var fakeClock *clock.FakeClock = allMeters.clock.(*clock.FakeClock) - ClearMetersForTest() // t+0m - - // Add resource123 - resource123 := r - resource123.Labels["id"] = "123" - _, err := optionForResource(&resource123) - if err != nil { - t.Error("Should succeed getting option, instead got error ", err) - } - // (123=0m, 456=Inf) - - // Bump time to make resource123's expiry offset from resource456 - fakeClock.Step(90 * time.Second) // t+1.5m - // (123=0m, 456=Inf) - - // Add 456 - resource456 := r - resource456.Labels["id"] = "456" - _, err = optionForResource(&resource456) - if err != nil { - t.Error("Should succeed getting option, instead got error ", err) - } - allMeters.lock.Lock() - if len(allMeters.meters) != 3 { - t.Errorf("len(allMeters)=%d, want: 3", len(allMeters.meters)) - } - allMeters.lock.Unlock() - // (123=1.5m, 456=0m) - - // Warm up the older entry - fakeClock.Step(90 * time.Second) //t+3m - // (123=4.5m, 456=3m) - - // Refresh the first entry - _, err = optionForResource(&resource123) - if err != nil { - t.Error("Should succeed getting option, instead got error ", err) - } - // (123=0, 456=1.5m) - - // Expire the second entry - fakeClock.Step(9 * time.Minute) // t+12m - time.Sleep(time.Second) // Wait a second on the wallclock, so that the cleanup thread has time to finish a loop - allMeters.lock.Lock() - if len(allMeters.meters) != 2 { - t.Errorf("len(allMeters)=%d, want: 2", len(allMeters.meters)) - } - allMeters.lock.Unlock() - // (123=9m, 456=10.5m) - // non-expiring defaultMeter was just tested - - // Add resource789 - resource789 := r - resource789.Labels["id"] = "789" - _, err = optionForResource(&resource789) - if err != nil { - t.Error("Should succeed getting option, instead got error ", err) - } - // (123=9m, 456=evicted, 789=0m) -} - -func TestResourceAsString(t *testing.T) { - r1 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k1": "v1", "k3": "v3", "k2": "v2"}} - r2 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k2": "v2", "k3": "v3", "k1": "v1"}} - r3 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k1": "v1", "k2": "v2", "k4": "v4"}} - - // Test 5 time since the iteration could be random. - for i := 0; i < 5; i++ { - - if s1, s2 := resourceToKey(r1), resourceToKey(r2); s1 != s2 { - t.Errorf("Expect same resources, but got %q and %q", s1, s2) - } - } - - if s1, s3 := resourceToKey(r1), resourceToKey(r3); s1 == s3 { - t.Error("Expect different resources, but got the same", s1) - } -} - type metricExtract struct { Name string Labels map[string]string @@ -355,11 +187,11 @@ func TestMetricsExport(t *testing.T) { harnesses := []struct { name string - init func() error + init func(t *testing.T) error validate func(t *testing.T) }{{ name: "Prometheus", - init: func() error { + init: func(t *testing.T) error { if err := UpdateExporter(context.Background(), configForBackend(prometheus), logtesting.TestLogger(t)); err != nil { return err } @@ -397,7 +229,7 @@ testComponent_testing_value{project="p1",revision="r2"} 1 }, }, { name: "OpenCensus", - init: func() error { + init: func(t *testing.T) error { if err := ocFake.start(len(resources) + 1); err != nil { return err } @@ -405,16 +237,13 @@ testComponent_testing_value{project="p1",revision="r2"} 1 return UpdateExporter(context.Background(), configForBackend(openCensus), logtesting.TestLogger(t)) }, validate: func(t *testing.T) { - t.Skip("Skipped because of excessive flakiness, see: https://github.com/knative/pkg/issues/1672") - // We unregister the views because this is one of two ways to flush // the internal aggregation buffers; the other is to have the // internal reporting period duration tick, which is at least // [new duration] in the future. - view.Unregister(globalCounter) - UnregisterResourceView(gaugeView, resourceCounter) - + metricstest.EnsureRecorded() records := []metricExtract{} + keys := map[string]struct{}{} loop: for { select { @@ -428,14 +257,16 @@ testComponent_testing_value{project="p1",revision="r2"} 1 if record.Resource != nil { labels = record.Resource.Labels } - records = append(records, metricExtract{ + metric := metricExtract{ Name: m.MetricDescriptor.Name, Labels: labels, Value: m.Timeseries[0].Points[0].GetInt64Value(), - }) + } + records = append(records, metric) + keys[metric.Key()] = struct{}{} } } - if len(records) >= len(expected) { + if len(keys) >= len(expected) { break loop } case <-time.After(4 * time.Second): @@ -450,7 +281,7 @@ testComponent_testing_value{project="p1",revision="r2"} 1 }, }, { name: "Stackdriver", - init: func() error { + init: func(t *testing.T) error { if err := initSdFake(&sdFake); err != nil { return err } @@ -485,7 +316,7 @@ testComponent_testing_value{project="p1",revision="r2"} 1 t.Run(c.name, func(t *testing.T) { ClearMetersForTest() sdFake.t = t - if err := c.init(); err != nil { + if err := c.init(t); err != nil { t.Fatalf("unable to init: %+v", err) } diff --git a/metrics/exporter.go b/metrics/exporter.go index 0c680106ee..17c07bda5e 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -23,6 +23,7 @@ import ( "strings" "sync" + "go.opencensus.io/resource" "go.opencensus.io/stats/view" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -146,7 +147,7 @@ func UpdateExporter(ctx context.Context, ops ExporterOptions, logger *zap.Sugare // Fail the process if there doesn't exist an exporter. logger.Errorw("Failed to get a valid metrics config", zap.Error(err)) } else { - logger.Errorw("Failed to get a valid metrics config; Skip updating the metrics exporter", zap.Error(err)) + logger.Errorw("Failed to get a valid metrics config; Skip updating the metrics exporter", "error", err) } return err } @@ -161,13 +162,13 @@ func UpdateExporter(ctx context.Context, ops ExporterOptions, logger *zap.Sugare flushGivenExporter(curMetricsExporter) e, f, err := newMetricsExporter(newConfig, logger) if err != nil { - logger.Errorw("Failed to update a new metrics exporter based on metric config", newConfig, zap.Error(err)) + logger.Errorw("Failed to update a new metrics exporter based on metric config", "config", newConfig, "error", err) return err } existingConfig := curMetricsConfig curMetricsExporter = e if err := setFactory(f); err != nil { - logger.Errorw("Failed to update metrics factory when loading metric config", newConfig, zap.Error(err)) + logger.Errorw("Failed to update metrics factory when loading metric config", "config", newConfig, "error", err) return err } logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, newConfig) @@ -212,7 +213,10 @@ func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view. openCensus: newOpenCensusExporter, prometheus: newPrometheusExporter, none: func(*metricsConfig, *zap.SugaredLogger) (view.Exporter, ResourceExporterFactory, error) { - return nil, nil, nil + noneFactory := func(*resource.Resource) (view.Exporter, error) { + return &noneExporter{}, nil + } + return &noneExporter{}, noneFactory, nil }, } @@ -272,3 +276,10 @@ func flushGivenExporter(e view.Exporter) bool { } return false } + +type noneExporter struct { +} + +// NoneExporter implements view.Exporter in the nil case. +func (*noneExporter) ExportView(*view.Data) { +} diff --git a/metrics/exporter_test.go b/metrics/exporter_test.go index 9d692bcbcb..871c8eb4c4 100644 --- a/metrics/exporter_test.go +++ b/metrics/exporter_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "golang.org/x/net/context" . "knative.dev/pkg/logging/testing" ) @@ -248,7 +249,14 @@ func TestInterleavedExporters(t *testing.T) { func TestFlushExporter(t *testing.T) { // No exporter - no action should be taken - setCurMetricsConfig(nil) + UpdateExporter(context.Background(), ExporterOptions{ + Domain: "test", + Component: "test", + ConfigMap: map[string]string{ + BackendDestinationKey: string(none), + }, + }, TestLogger(t)) + if want, got := false, FlushExporter(); got != want { t.Errorf("Expected %v, got %v.", want, got) } diff --git a/metrics/resource_view_test.go b/metrics/resource_view_test.go index 0cc5dbced1..6ecb9936a8 100644 --- a/metrics/resource_view_test.go +++ b/metrics/resource_view_test.go @@ -17,49 +17,18 @@ limitations under the License. package metrics import ( - "context" - "errors" "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "os" - "sort" - "strings" - "sync" "testing" "time" - sd "contrib.go.opencensus.io/exporter/stackdriver" - ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" - ocresource "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "go.opencensus.io/resource" "go.opencensus.io/stats" "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/wait" - - emptypb "github.com/golang/protobuf/ptypes/empty" - "github.com/google/go-cmp/cmp" - "google.golang.org/api/option" - metricpb "google.golang.org/genproto/googleapis/api/metric" - stackdriverpb "google.golang.org/genproto/googleapis/monitoring/v3" - "google.golang.org/grpc" - proto "google.golang.org/protobuf/proto" - - logtesting "knative.dev/pkg/logging/testing" - "knative.dev/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricstest" ) var ( - r = resource.Resource{Labels: map[string]string{"foo": "bar"}} - NamespaceTagKey = tag.MustNewKey(metricskey.LabelNamespaceName) - ServiceTagKey = tag.MustNewKey(metricskey.LabelServiceName) - ConfigTagKey = tag.MustNewKey(metricskey.LabelConfigurationName) - RevisionTagKey = tag.MustNewKey(metricskey.LabelRevisionName) + r = resource.Resource{Labels: map[string]string{"foo": "bar"}} ) func TestRegisterResourceView(t *testing.T) { @@ -106,6 +75,14 @@ type testExporter struct { } func TestSetFactory(t *testing.T) { + var oldFactory ResourceExporterFactory + func() { + allMeters.lock.Lock() + defer allMeters.lock.Unlock() + + oldFactory = allMeters.factory + }() + fakeFactory := func(rr *resource.Resource) (view.Exporter, error) { if rr == nil { return &testExporter{}, nil @@ -144,6 +121,8 @@ func TestSetFactory(t *testing.T) { if e.id != "456" { t.Error("Expect id to be 456, instead got", e.id) } + + setFactory(oldFactory) } func TestAllMetersExpiration(t *testing.T) { @@ -227,572 +206,3 @@ func TestResourceAsString(t *testing.T) { t.Error("Expect different resources, but got the same", s1) } } - -type metricExtract struct { - Name string - Labels map[string]string - Value int64 -} - -func (m metricExtract) Key() string { - return fmt.Sprintf("%s<%s>", m.Name, resource.EncodeLabels(m.Labels)) -} - -func (m metricExtract) String() string { - return fmt.Sprintf("%s:%d", m.Key(), m.Value) -} - -func initSdFake(sdFake *stackDriverFake) error { - if err := sdFake.start(); err != nil { - return err - } - conn, err := grpc.Dial(sdFake.address, grpc.WithInsecure()) - if err != nil { - return err - } - newStackdriverExporterFunc = func(o sd.Options) (view.Exporter, error) { - o.MonitoringClientOptions = append(o.MonitoringClientOptions, option.WithGRPCConn(conn)) - return newOpencensusSDExporter(o) - } - // File: must exist, be json of credentialsFile, and type must be a jwtConfig or oauth2Config - tmp, err := ioutil.TempFile("", "metrics-sd-test") - if err != nil { - return err - } - defer tmp.Close() - credentialsContent := []byte(`{"type": "service_account"}`) - if _, err := tmp.Write(credentialsContent); err != nil { - return err - } - os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", tmp.Name()) - return nil -} - -func sortMetrics() cmp.Option { - return cmp.Transformer("Sort", func(in []metricExtract) []string { - out := make([]string, 0, len(in)) - seen := map[string]int{} - for _, m := range in { - // Keep only the newest report for a key - key := m.Key() - if seen[key] == 0 { - out = append(out, m.String()) - seen[key] = len(out) // Store address+1 to avoid doubling first item. - } else { - out[seen[key]-1] = m.String() - } - } - sort.Strings(out) - return out - }) -} - -// Begin table tests for exporters -func TestMetricsExport(t *testing.T) { - TestOverrideBundleCount = 1 - t.Cleanup(func() { TestOverrideBundleCount = 0 }) - ocFake := openCensusFake{address: "localhost:12345"} - sdFake := stackDriverFake{} - prometheusPort := 19090 - configForBackend := func(backend metricsBackend) ExporterOptions { - return ExporterOptions{ - Domain: servingDomain, - Component: testComponent, - PrometheusPort: prometheusPort, - ConfigMap: map[string]string{ - BackendDestinationKey: string(backend), - collectorAddressKey: ocFake.address, - allowStackdriverCustomMetricsKey: "true", - stackdriverCustomMetricSubDomainKey: servingDomain, - reportingPeriodKey: "1", - }, - } - } - - resources := []*resource.Resource{ - { - Type: "revision", - Labels: map[string]string{ - "project": "p1", - "revision": "r1", - }, - }, - { - Type: "revision", - Labels: map[string]string{ - "project": "p1", - "revision": "r2", - }, - }, - } - gauge := stats.Int64("testing/value", "Stored value", stats.UnitDimensionless) - counter := stats.Int64("export counts", "Times through the export", stats.UnitDimensionless) - gaugeView := &view.View{ - Name: "testing/value", - Description: "Test value", - Measure: gauge, - Aggregation: view.LastValue(), - } - resourceCounter := &view.View{ - Name: "resource_global_export_count", - Description: "Count of exports via RegisterResourceView.", - Measure: counter, - Aggregation: view.Count(), - } - globalCounter := &view.View{ - Name: "global_export_counts", - Description: "Count of exports via standard OpenCensus view.", - Measure: counter, - Aggregation: view.Count(), - } - - expected := []metricExtract{ - {"knative.dev/serving/testComponent/global_export_counts", map[string]string{}, 2}, - {"knative.dev/serving/testComponent/resource_global_export_count", map[string]string{}, 2}, - {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r1"}, 0}, - {"knative.dev/serving/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r2"}, 1}, - } - - harnesses := []struct { - name string - init func() error - validate func(t *testing.T) - }{{ - name: "Prometheus", - init: func() error { - if err := UpdateExporter(context.Background(), configForBackend(prometheus), logtesting.TestLogger(t)); err != nil { - return err - } - // Wait for the webserver to actually start serving metrics - return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) - return err == nil && resp.StatusCode == http.StatusOK, nil - }) - }, - validate: func(t *testing.T) { - metricstest.EnsureRecorded() - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) - if err != nil { - t.Fatalf("failed to fetch prometheus metrics: %+v", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Fatalf("failed to read prometheus response: %+v", err) - } - want := `# HELP testComponent_global_export_counts Count of exports via standard OpenCensus view. -# TYPE testComponent_global_export_counts counter -testComponent_global_export_counts 2 -# HELP testComponent_resource_global_export_count Count of exports via RegisterResourceView. -# TYPE testComponent_resource_global_export_count counter -testComponent_resource_global_export_count 2 -# HELP testComponent_testing_value Test value -# TYPE testComponent_testing_value gauge -testComponent_testing_value{project="p1",revision="r1"} 0 -testComponent_testing_value{project="p1",revision="r2"} 1 -` - if diff := cmp.Diff(want, string(body)); diff != "" { - t.Errorf("Unexpected prometheus output (-want +got):\n%s", diff) - } - }, - }, { - name: "OpenCensus", - init: func() error { - if err := ocFake.start(len(resources) + 1); err != nil { - return err - } - t.Log("Created exporter at", ocFake.address) - return UpdateExporter(context.Background(), configForBackend(openCensus), logtesting.TestLogger(t)) - }, - validate: func(t *testing.T) { - t.Skip("Skipped because of excessive flakiness, see: https://github.com/knative/pkg/issues/1672") - - // We unregister the views because this is one of two ways to flush - // the internal aggregation buffers; the other is to have the - // internal reporting period duration tick, which is at least - // [new duration] in the future. - view.Unregister(globalCounter) - UnregisterResourceView(gaugeView, resourceCounter) - - records := []metricExtract{} - loop: - for { - select { - case record := <-ocFake.published: - if record == nil { - continue loop - } - for _, m := range record.Metrics { - if len(m.Timeseries) > 0 { - labels := map[string]string{} - if record.Resource != nil { - labels = record.Resource.Labels - } - records = append(records, metricExtract{ - Name: m.MetricDescriptor.Name, - Labels: labels, - Value: m.Timeseries[0].Points[0].GetInt64Value(), - }) - } - } - if len(records) >= len(expected) { - break loop - } - case <-time.After(4 * time.Second): - t.Error("Timeout reading input") - break loop - } - } - - if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { - t.Errorf("Unexpected OpenCensus exports (-want +got):\n%s", diff) - } - }, - }, { - name: "Stackdriver", - init: func() error { - if err := initSdFake(&sdFake); err != nil { - return err - } - return UpdateExporter(context.Background(), configForBackend(stackdriver), logtesting.TestLogger(t)) - }, - validate: func(t *testing.T) { - records := []metricExtract{} - for record := range sdFake.published { - for _, ts := range record.TimeSeries { - name := ts.Metric.Type[len("custom.googleapis.com/"):] - records = append(records, metricExtract{ - Name: name, - Labels: ts.Resource.Labels, - Value: ts.Points[0].Value.GetInt64Value(), - }) - } - if len(records) >= 4 { - // There's no way to synchronize on the internal timer used - // by metricsexport.IntervalReader, so shut down the - // exporter after the first report cycle. - FlushExporter() - sdFake.srv.GracefulStop() - } - } - if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" { - t.Errorf("Unexpected Stackdriver exports (-want +got):\n%s", diff) - } - }, - }} - - for _, c := range harnesses { - t.Run(c.name, func(t *testing.T) { - ClearMetersForTest() - sdFake.t = t - if err := c.init(); err != nil { - t.Fatalf("unable to init: %+v", err) - } - - view.Register(globalCounter) - if err := RegisterResourceView(gaugeView, resourceCounter); err != nil { - t.Fatal("Unable to register views:", err) - } - t.Cleanup(func() { - view.Unregister(globalCounter) - UnregisterResourceView(gaugeView, resourceCounter) - }) - - for i, r := range resources { - ctx := context.Background() - Record(ctx, counter.M(int64(1))) - if r != nil { - ctx = metricskey.WithResource(ctx, *r) - } - Record(ctx, gauge.M(int64(i))) - } - c.validate(t) - }) - } -} - -func TestStackDriverExports(t *testing.T) { - TestOverrideBundleCount = 1 - t.Cleanup(func() { TestOverrideBundleCount = 0 }) - eo := ExporterOptions{ - Domain: servingDomain, - Component: "autoscaler", - ConfigMap: map[string]string{ - BackendDestinationKey: string(stackdriver), - reportingPeriodKey: "1", - stackdriverProjectIDKey: "foobar", - }, - } - - label1 := map[string]string{ - "cluster_name": "test-cluster", - "configuration_name": "config", - "location": "test-location", - "namespace_name": "ns", - "project_id": "foobar", - "revision_name": "revision", - "service_name": "service", - } - label2 := map[string]string{ - "cluster_name": "test-cluster", - "configuration_name": "config2", - "location": "test-location", - "namespace_name": "ns2", - "project_id": "foobar", - "revision_name": "revision2", - "service_name": "service2", - } - batchLabels := map[string]string{ - "namespace_name": "ns2", - "configuration_name": "config2", - "revision_name": "revision2", - "service_name": "service2", - } - harness := []struct { - name string - allowCustomMetrics string - expected []metricExtract - }{{ - name: "Allow custom metrics", - allowCustomMetrics: "true", - expected: []metricExtract{ - { - "knative.dev/serving/autoscaler/actual_pods", - label1, - 1, - }, - { - "knative.dev/serving/autoscaler/desired_pods", - label2, - 2, - }, - { - "custom.googleapis.com/knative.dev/autoscaler/not_ready_pods", - batchLabels, - 3, - }, - }, - }, { - name: "Don't allow custom metrics", - allowCustomMetrics: "false", - expected: []metricExtract{ - { - "knative.dev/serving/autoscaler/actual_pods", - label1, - 1, - }, - { - "knative.dev/serving/autoscaler/desired_pods", - label2, - 2, - }, - }, - }} - - for _, tc := range harness { - t.Run(tc.name, func(t *testing.T) { - eo.ConfigMap[allowStackdriverCustomMetricsKey] = tc.allowCustomMetrics - // Change the cluster name to reinitialize the exporter and pick up a new port. - eo.ConfigMap[stackdriverClusterNameKey] = tc.name - actualPodCountM := stats.Int64( - "actual_pods", - "Number of pods that are allocated currently", - stats.UnitDimensionless) - actualPodsCountView := &view.View{ - Description: "Number of pods that are allocated currently", - Measure: actualPodCountM, - Aggregation: view.LastValue(), - TagKeys: []tag.Key{NamespaceTagKey, ServiceTagKey, ConfigTagKey, RevisionTagKey}, - } - desiredPodCountM := stats.Int64( - "desired_pods", - "Number of pods that are desired", - stats.UnitDimensionless) - desiredPodsCountView := &view.View{ - Description: "Number of pods that are desired", - Measure: desiredPodCountM, - Aggregation: view.LastValue(), - } - notReadyPodCountM := stats.Int64( - "not_ready_pods", - "Number of pods that are not ready", - stats.UnitDimensionless) - customView := &view.View{ - Description: "non-knative-revision metric per KnativeRevisionMetrics", - Measure: notReadyPodCountM, - Aggregation: view.LastValue(), - } - - sdFake := stackDriverFake{t: t} - if err := initSdFake(&sdFake); err != nil { - t.Error("Init stackdriver failed", err) - } - if err := UpdateExporter(context.Background(), eo, logtesting.TestLogger(t)); err != nil { - t.Error("UpdateExporter failed", err) - } - - if err := RegisterResourceView(desiredPodsCountView, actualPodsCountView, customView); err != nil { - t.Fatalf("unable to register view: %+v", err) - } - t.Cleanup(func() { - UnregisterResourceView(desiredPodsCountView, actualPodsCountView, customView) - }) - - ctx, err := tag.New(context.Background(), tag.Upsert(NamespaceTagKey, "ns"), - tag.Upsert(ServiceTagKey, "service"), - tag.Upsert(ConfigTagKey, "config"), - tag.Upsert(RevisionTagKey, "revision")) - if err != nil { - t.Fatal("Unable to create tags", err) - } - Record(ctx, actualPodCountM.M(int64(1))) - - r := resource.Resource{ - Type: "testing", - Labels: batchLabels, - } - RecordBatch( - metricskey.WithResource(context.Background(), r), - desiredPodCountM.M(int64(2)), - notReadyPodCountM.M(int64(3))) - - records := []metricExtract{} - loop: - for { - select { - case record := <-sdFake.published: - for _, ts := range record.TimeSeries { - extracted := metricExtract{ - Name: ts.Metric.Type, - Labels: ts.Resource.Labels, - Value: ts.Points[0].Value.GetInt64Value(), - } - // Override 'cluster-name' label to reset to a fixed value - if extracted.Labels["cluster_name"] != "" { - extracted.Labels["cluster_name"] = "test-cluster" - } - records = append(records, extracted) - if strings.HasPrefix(ts.Metric.Type, "knative.dev/") { - if diff := cmp.Diff(ts.Resource.Type, metricskey.ResourceTypeKnativeRevision); diff != "" { - t.Errorf("Incorrect resource type for %q: (-want +got):\n%s", ts.Metric.Type, diff) - } - } - } - if len(records) >= len(tc.expected) { - // There's no way to synchronize on the internal timer used - // by metricsexport.IntervalReader, so shut down the - // exporter after the first report cycle. - FlushExporter() - sdFake.srv.GracefulStop() - break loop - } - case <-time.After(4 * time.Second): - t.Error("Timeout reading records from Stackdriver") - break loop - } - } - if diff := cmp.Diff(tc.expected, records, sortMetrics()); diff != "" { - t.Errorf("Unexpected stackdriver knative exports (-want +got):\n%s", diff) - } - }) - } -} - -type openCensusFake struct { - ocmetrics.UnimplementedMetricsServiceServer - address string - srv *grpc.Server - exports sync.WaitGroup - wg sync.WaitGroup - published chan *ocmetrics.ExportMetricsServiceRequest -} - -func (oc *openCensusFake) start(expectedStreams int) error { - ln, err := net.Listen("tcp", oc.address) - if err != nil { - return err - } - oc.published = make(chan *ocmetrics.ExportMetricsServiceRequest, 100) - oc.srv = grpc.NewServer() - ocmetrics.RegisterMetricsServiceServer(oc.srv, oc) - // Run the server in the background. - oc.wg.Add(1) - go func() { - oc.srv.Serve(ln) - oc.wg.Done() - oc.wg.Wait() - close(oc.published) - }() - oc.exports.Add(expectedStreams) - go oc.stop() - return nil -} - -func (oc *openCensusFake) stop() { - oc.exports.Wait() - oc.srv.Stop() -} - -func (oc *openCensusFake) Export(stream ocmetrics.MetricsService_ExportServer) error { - var streamResource *ocresource.Resource - oc.wg.Add(1) - defer oc.wg.Done() - metricSeen := false - for { - in, err := stream.Recv() - if errors.Is(err, io.EOF) { - return nil - } - if err != nil { - return err - } - if in.Resource != nil { - // The stream is stateful, keep track of the last Resource seen. - streamResource = in.Resource - } - if len(in.Metrics) > 0 { - if in.Resource == nil { - in.Resource = streamResource - } - oc.published <- proto.Clone(in).(*ocmetrics.ExportMetricsServiceRequest) - if !metricSeen { - oc.exports.Done() - metricSeen = true - } - } - } -} - -type stackDriverFake struct { - stackdriverpb.UnimplementedMetricServiceServer - address string - srv *grpc.Server - t *testing.T - published chan *stackdriverpb.CreateTimeSeriesRequest -} - -func (sd *stackDriverFake) start() error { - sd.published = make(chan *stackdriverpb.CreateTimeSeriesRequest, 100) - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - return err - } - sd.address = ln.Addr().String() - sd.srv = grpc.NewServer() - stackdriverpb.RegisterMetricServiceServer(sd.srv, sd) - // Run the server in the background. - go func() { - sd.srv.Serve(ln) - close(sd.published) - }() - return nil -} - -func (sd *stackDriverFake) CreateTimeSeries(ctx context.Context, req *stackdriverpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) { - sd.published <- req - return &emptypb.Empty{}, nil -} - -func (sd *stackDriverFake) CreateMetricDescriptor(ctx context.Context, req *stackdriverpb.CreateMetricDescriptorRequest) (*metricpb.MetricDescriptor, error) { - return req.MetricDescriptor, nil -} From d12a9437ed5eea82a52fd23964015622edf35c65 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Fri, 11 Dec 2020 01:13:55 -0800 Subject: [PATCH 4/6] Address yanweiguo feedback --- metrics/e2e_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metrics/e2e_test.go b/metrics/e2e_test.go index 2762278bba..cf66f30b6f 100644 --- a/metrics/e2e_test.go +++ b/metrics/e2e_test.go @@ -237,12 +237,12 @@ testComponent_testing_value{project="p1",revision="r2"} 1 return UpdateExporter(context.Background(), configForBackend(openCensus), logtesting.TestLogger(t)) }, validate: func(t *testing.T) { - // We unregister the views because this is one of two ways to flush - // the internal aggregation buffers; the other is to have the - // internal reporting period duration tick, which is at least - // [new duration] in the future. metricstest.EnsureRecorded() records := []metricExtract{} + // Each Resource has an independent thread invoking reportView; this + // set avoids the race condition where we get two reports for the + // same metric on the channel before we get any reports for another + // metric. keys := map[string]struct{}{} loop: for { From 781822d0d0f0a84f0bea30580b2770e74a85e4c0 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Mon, 14 Dec 2020 15:22:39 -0800 Subject: [PATCH 5/6] Cleanup per vagababov --- metrics/e2e_test.go | 87 ++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 48 deletions(-) diff --git a/metrics/e2e_test.go b/metrics/e2e_test.go index cf66f30b6f..2163f0c1b6 100644 --- a/metrics/e2e_test.go +++ b/metrics/e2e_test.go @@ -74,7 +74,7 @@ func (m metricExtract) String() string { return fmt.Sprintf("%s:%d", m.Key(), m.Value) } -func initSdFake(sdFake *stackDriverFake) error { +func initStackdriverFake(sdFake *stackDriverFake) error { if err := sdFake.start(); err != nil { return err } @@ -141,22 +141,19 @@ func TestMetricsExport(t *testing.T) { } } - resources := []*resource.Resource{ - { - Type: "revision", - Labels: map[string]string{ - "project": "p1", - "revision": "r1", - }, + resources := []*resource.Resource{{ + Type: "revision", + Labels: map[string]string{ + "project": "p1", + "revision": "r1", }, - { - Type: "revision", - Labels: map[string]string{ - "project": "p1", - "revision": "r2", - }, + }, { + Type: "revision", + Labels: map[string]string{ + "project": "p1", + "revision": "r2", }, - } + }} gauge := stats.Int64("testing/value", "Stored value", stats.UnitDimensionless) counter := stats.Int64("export counts", "Times through the export", stats.UnitDimensionless) gaugeView := &view.View{ @@ -212,7 +209,7 @@ func TestMetricsExport(t *testing.T) { if err != nil { t.Fatalf("failed to read prometheus response: %+v", err) } - want := `# HELP testComponent_global_export_counts Count of exports via standard OpenCensus view. + const want = `# HELP testComponent_global_export_counts Count of exports via standard OpenCensus view. # TYPE testComponent_global_export_counts counter testComponent_global_export_counts 2 # HELP testComponent_resource_global_export_count Count of exports via RegisterResourceView. @@ -244,6 +241,7 @@ testComponent_testing_value{project="p1",revision="r2"} 1 // same metric on the channel before we get any reports for another // metric. keys := map[string]struct{}{} + timeout := time.After(5 * time.Second) loop: for { select { @@ -269,7 +267,7 @@ testComponent_testing_value{project="p1",revision="r2"} 1 if len(keys) >= len(expected) { break loop } - case <-time.After(4 * time.Second): + case <-timeout: t.Error("Timeout reading input") break loop } @@ -282,7 +280,7 @@ testComponent_testing_value{project="p1",revision="r2"} 1 }, { name: "Stackdriver", init: func(t *testing.T) error { - if err := initSdFake(&sdFake); err != nil { + if err := initStackdriverFake(&sdFake); err != nil { return err } return UpdateExporter(context.Background(), configForBackend(stackdriver), logtesting.TestLogger(t)) @@ -386,38 +384,31 @@ func TestStackDriverExports(t *testing.T) { }{{ name: "Allow custom metrics", allowCustomMetrics: "true", - expected: []metricExtract{ - { - "knative.dev/serving/autoscaler/actual_pods", - label1, - 1, - }, - { - "knative.dev/serving/autoscaler/desired_pods", - label2, - 2, - }, - { - "custom.googleapis.com/knative.dev/autoscaler/not_ready_pods", - batchLabels, - 3, - }, - }, + expected: []metricExtract{{ + "knative.dev/serving/autoscaler/actual_pods", + label1, + 1, + }, { + "knative.dev/serving/autoscaler/desired_pods", + label2, + 2, + }, { + "custom.googleapis.com/knative.dev/autoscaler/not_ready_pods", + batchLabels, + 3, + }}, }, { name: "Don't allow custom metrics", allowCustomMetrics: "false", - expected: []metricExtract{ - { - "knative.dev/serving/autoscaler/actual_pods", - label1, - 1, - }, - { - "knative.dev/serving/autoscaler/desired_pods", - label2, - 2, - }, - }, + expected: []metricExtract{{ + "knative.dev/serving/autoscaler/actual_pods", + label1, + 1, + }, { + "knative.dev/serving/autoscaler/desired_pods", + label2, + 2, + }}, }} for _, tc := range harness { @@ -455,7 +446,7 @@ func TestStackDriverExports(t *testing.T) { } sdFake := stackDriverFake{t: t} - if err := initSdFake(&sdFake); err != nil { + if err := initStackdriverFake(&sdFake); err != nil { t.Error("Init stackdriver failed", err) } if err := UpdateExporter(context.Background(), eo, logtesting.TestLogger(t)); err != nil { From 572583642c8f68b1f68d461307a874bb36beb8df Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Fri, 18 Dec 2020 10:29:20 -0800 Subject: [PATCH 6/6] Switch to zap.Error but avoid the bug the code previously had. --- metrics/exporter.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metrics/exporter.go b/metrics/exporter.go index 17c07bda5e..b98a18de1f 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -147,7 +147,7 @@ func UpdateExporter(ctx context.Context, ops ExporterOptions, logger *zap.Sugare // Fail the process if there doesn't exist an exporter. logger.Errorw("Failed to get a valid metrics config", zap.Error(err)) } else { - logger.Errorw("Failed to get a valid metrics config; Skip updating the metrics exporter", "error", err) + logger.Errorw("Failed to get a valid metrics config; Skip updating the metrics exporter", zap.Error(err)) } return err } @@ -162,13 +162,13 @@ func UpdateExporter(ctx context.Context, ops ExporterOptions, logger *zap.Sugare flushGivenExporter(curMetricsExporter) e, f, err := newMetricsExporter(newConfig, logger) if err != nil { - logger.Errorw("Failed to update a new metrics exporter based on metric config", "config", newConfig, "error", err) + logger.Errorw("Failed to update a new metrics exporter based on metric config", zap.Error(err), "config", newConfig) return err } existingConfig := curMetricsConfig curMetricsExporter = e if err := setFactory(f); err != nil { - logger.Errorw("Failed to update metrics factory when loading metric config", "config", newConfig, "error", err) + logger.Errorw("Failed to update metrics factory when loading metric config", zap.Error(err), "config", newConfig) return err } logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, newConfig)