From fc18ec35eb1cf786aa6d6cb9ae33522b947a9f30 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 3 Mar 2017 18:02:32 -0500 Subject: [PATCH] Add RPC metrics emitter as Observer/SpanObserver (#103) --- .travis.yml | 1 - CHANGELOG.md | 3 +- rpcmetrics/README.md | 5 + rpcmetrics/doc.go | 22 ++++ rpcmetrics/endpoints.go | 69 +++++++++++++ rpcmetrics/endpoints_test.go | 49 +++++++++ rpcmetrics/metrics.go | 130 ++++++++++++++++++++++++ rpcmetrics/metrics_test.go | 60 +++++++++++ rpcmetrics/normalizer.go | 107 +++++++++++++++++++ rpcmetrics/normalizer_test.go | 40 ++++++++ rpcmetrics/observer.go | 176 ++++++++++++++++++++++++++++++++ rpcmetrics/observer_test.go | 186 ++++++++++++++++++++++++++++++++++ 12 files changed, 846 insertions(+), 2 deletions(-) create mode 100644 rpcmetrics/README.md create mode 100644 rpcmetrics/doc.go create mode 100644 rpcmetrics/endpoints.go create mode 100644 rpcmetrics/endpoints_test.go create mode 100644 rpcmetrics/metrics.go create mode 100644 rpcmetrics/metrics_test.go create mode 100644 rpcmetrics/normalizer.go create mode 100644 rpcmetrics/normalizer_test.go create mode 100644 rpcmetrics/observer.go create mode 100644 rpcmetrics/observer_test.go diff --git a/.travis.yml b/.travis.yml index 4541bb99..f8464488 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,6 @@ sudo: required language: go go: - - 1.6 - 1.7 services: diff --git a/CHANGELOG.md b/CHANGELOG.md index db0d596b..0c84bce3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ Changes by Version 2.2.0 (Unreleased) ------------------ -- Introduce Observer and SpanObserver +- Introduce Observer and SpanObserver (https://github.com/uber/jaeger-client-go/pull/94) +- Add RPC metrics emitter as Observer/SpanObserver (https://github.com/uber/jaeger-client-go/pull/103) 2.1.2 (2016-02-27) diff --git a/rpcmetrics/README.md b/rpcmetrics/README.md new file mode 100644 index 00000000..879948e9 --- /dev/null +++ b/rpcmetrics/README.md @@ -0,0 +1,5 @@ +An Observer that can be used to emit RPC metrics +================================================ + +It can be attached to the tracer during tracer construction. +See `ExampleObserver` function in [observer_test.go](./observer_test.go). diff --git a/rpcmetrics/doc.go b/rpcmetrics/doc.go new file mode 100644 index 00000000..5b73b140 --- /dev/null +++ b/rpcmetrics/doc.go @@ -0,0 +1,22 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package rpcmetrics implements an Observer that can be used to emit RPC metrics. +package rpcmetrics diff --git a/rpcmetrics/endpoints.go b/rpcmetrics/endpoints.go new file mode 100644 index 00000000..79ec1c8e --- /dev/null +++ b/rpcmetrics/endpoints.go @@ -0,0 +1,69 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +import "sync" + +// normalizedEndpoints is a cache for endpointName -> safeName mappings. +type normalizedEndpoints struct { + names map[string]string + maxSize int + defaultName string + normalizer NameNormalizer + mux sync.RWMutex +} + +func newNormalizedEndpoints(maxSize int, normalizer NameNormalizer) *normalizedEndpoints { + return &normalizedEndpoints{ + maxSize: maxSize, + normalizer: normalizer, + names: make(map[string]string, maxSize), + } +} + +// normalize looks up the name in the cache, if not found it uses normalizer +// to convert the name to a safe name. If called with more than maxSize unique +// names it returns "" for all other names beyond those already cached. +func (n *normalizedEndpoints) normalize(name string) string { + n.mux.RLock() + norm, ok := n.names[name] + l := len(n.names) + n.mux.RUnlock() + if ok { + return norm + } + if l >= n.maxSize { + return "" + } + return n.normalizeWithLock(name) +} + +func (n *normalizedEndpoints) normalizeWithLock(name string) string { + norm := n.normalizer.Normalize(name) + n.mux.Lock() + defer n.mux.Unlock() + // cache may have grown while we were not holding the lock + if len(n.names) >= n.maxSize { + return "" + } + n.names[name] = norm + return norm +} diff --git a/rpcmetrics/endpoints_test.go b/rpcmetrics/endpoints_test.go new file mode 100644 index 00000000..39219e5e --- /dev/null +++ b/rpcmetrics/endpoints_test.go @@ -0,0 +1,49 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNormalizedEndpoints(t *testing.T) { + n := newNormalizedEndpoints(1, DefaultNameNormalizer) + + assertLen := func(l int) { + n.mux.RLock() + defer n.mux.RUnlock() + assert.Len(t, n.names, l) + } + + assert.Equal(t, "ab-cd", n.normalize("ab^cd"), "one translation") + assert.Equal(t, "ab-cd", n.normalize("ab^cd"), "cache hit") + assertLen(1) + assert.Equal(t, "", n.normalize("xys"), "cache overflow") + assertLen(1) +} + +func TestNormalizedEndpointsDoubleLocking(t *testing.T) { + n := newNormalizedEndpoints(1, DefaultNameNormalizer) + assert.Equal(t, "ab-cd", n.normalize("ab^cd"), "fill out the cache") + assert.Equal(t, "", n.normalizeWithLock("xys"), "cache overflow") +} diff --git a/rpcmetrics/metrics.go b/rpcmetrics/metrics.go new file mode 100644 index 00000000..82c40559 --- /dev/null +++ b/rpcmetrics/metrics.go @@ -0,0 +1,130 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +import ( + "sync" + + "github.com/uber/jaeger-lib/metrics" +) + +const ( + otherEndpointsPlaceholder = "other" + endpointNameMetricTag = "endpoint" +) + +// Metrics is a collection of metrics for an endpoint describing +// throughput, success, errors, and performance. +type Metrics struct { + // Requests is a counter of the total number of successes + failures. + Requests metrics.Counter `metric:"requests"` + + // Success is a counter of the total number of successes. + Success metrics.Counter `metric:"success"` + + // Failures is a counter of the number of times any failure has been observed. + Failures metrics.Counter `metric:"failures"` + + // RequestLatencyMs is a histogram of the latency of requests in milliseconds. + RequestLatencyMs metrics.Timer `metric:"request_latency_ms"` + + // HTTPStatusCode2xx is a counter of the total number of requests with HTTP status code 200-299 + HTTPStatusCode2xx metrics.Counter `metric:"http_status_code_2xx"` + + // HTTPStatusCode3xx is a counter of the total number of requests with HTTP status code 300-399 + HTTPStatusCode3xx metrics.Counter `metric:"http_status_code_3xx"` + + // HTTPStatusCode4xx is a counter of the total number of requests with HTTP status code 400-499 + HTTPStatusCode4xx metrics.Counter `metric:"http_status_code_4xx"` + + // HTTPStatusCode5xx is a counter of the total number of requests with HTTP status code 500-599 + HTTPStatusCode5xx metrics.Counter `metric:"http_status_code_5xx"` +} + +func (m *Metrics) recordHTTPStatusCode(statusCode uint16) { + if statusCode >= 200 && statusCode < 300 { + m.HTTPStatusCode2xx.Inc(1) + } else if statusCode >= 300 && statusCode < 400 { + m.HTTPStatusCode3xx.Inc(1) + } else if statusCode >= 400 && statusCode < 500 { + m.HTTPStatusCode4xx.Inc(1) + } else if statusCode >= 500 && statusCode < 600 { + m.HTTPStatusCode5xx.Inc(1) + } +} + +// MetricsByEndpoint is a registry/cache of metrics for each unique endpoint name. +// Only maxNumberOfEndpoints Metrics are stored, all other endpoint names are mapped +// to a generic endpoint name "other". +type MetricsByEndpoint struct { + metricsFactory metrics.Factory + endpoints *normalizedEndpoints + metricsByEndpoint map[string]*Metrics + mux sync.RWMutex +} + +func newMetricsByEndpoint( + metricsFactory metrics.Factory, + normalizer NameNormalizer, + maxNumberOfEndpoints int, +) *MetricsByEndpoint { + return &MetricsByEndpoint{ + metricsFactory: metricsFactory, + endpoints: newNormalizedEndpoints(maxNumberOfEndpoints, normalizer), + metricsByEndpoint: make(map[string]*Metrics, maxNumberOfEndpoints+1), // +1 for "other" + } +} + +func (m *MetricsByEndpoint) get(endpoint string) *Metrics { + safeName := m.endpoints.normalize(endpoint) + if safeName == "" { + safeName = otherEndpointsPlaceholder + } + m.mux.RLock() + met := m.metricsByEndpoint[safeName] + m.mux.RUnlock() + if met != nil { + return met + } + + return m.getWithWriteLock(safeName) +} + +// split to make easier to test +func (m *MetricsByEndpoint) getWithWriteLock(safeName string) *Metrics { + m.mux.Lock() + defer m.mux.Unlock() + + // it is possible that the name has been already registered after we released + // the read lock and before we grabbed the write lock, so check for that. + if met, ok := m.metricsByEndpoint[safeName]; ok { + return met + } + + // it would be nice to create the struct before locking, since Init() is somewhat + // expensive, however some metrics backends (e.g. expvar) may not like duplicate metrics. + met := &Metrics{} + tags := map[string]string{endpointNameMetricTag: safeName} + metrics.Init(met, m.metricsFactory, tags) + + m.metricsByEndpoint[safeName] = met + return met +} diff --git a/rpcmetrics/metrics_test.go b/rpcmetrics/metrics_test.go new file mode 100644 index 00000000..670389be --- /dev/null +++ b/rpcmetrics/metrics_test.go @@ -0,0 +1,60 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics" + "github.com/uber/jaeger-lib/metrics/testutils" +) + +func endpointTags(endpoint string) map[string]string { + return map[string]string{ + "endpoint": endpoint, + } +} + +func TestMetricsByEndpoint(t *testing.T) { + met := metrics.NewLocalFactory(0) + mbe := newMetricsByEndpoint(met, DefaultNameNormalizer, 2) + + m1 := mbe.get("abc1") + m2 := mbe.get("abc1") // from cache + m2a := mbe.getWithWriteLock("abc1") // from cache in double-checked lock + assert.Equal(t, m1, m2) + assert.Equal(t, m1, m2a) + + m3 := mbe.get("abc3") + m4 := mbe.get("overflow") + m5 := mbe.get("overflow2") + + for _, m := range []*Metrics{m1, m2, m2a, m3, m4, m5} { + m.Requests.Inc(1) + } + + testutils.AssertCounterMetrics(t, met, + testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("abc1"), Value: 3}, + testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("abc3"), Value: 1}, + testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("other"), Value: 2}, + ) +} diff --git a/rpcmetrics/normalizer.go b/rpcmetrics/normalizer.go new file mode 100644 index 00000000..ac0f79b0 --- /dev/null +++ b/rpcmetrics/normalizer.go @@ -0,0 +1,107 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +// NameNormalizer is used to convert the endpoint names to strings +// that can be safely used as tags in the metrics. +type NameNormalizer interface { + Normalize(name string) string +} + +// DefaultNameNormalizer converts endpoint names so that they contain only characters +// from the safe charset [a-zA-Z0-9-./_]. All other characters are replaced with '-'. +var DefaultNameNormalizer = &SimpleNameNormalizer{ + SafeSets: []SafeCharacterSet{ + &Range{From: 'a', To: 'z'}, + &Range{From: 'A', To: 'Z'}, + &Range{From: '0', To: '9'}, + &Char{'-'}, + &Char{'_'}, + &Char{'/'}, + &Char{'.'}, + }, + Replacement: '-', +} + +// SimpleNameNormalizer uses a set of safe character sets. +type SimpleNameNormalizer struct { + SafeSets []SafeCharacterSet + Replacement byte +} + +// SafeCharacterSet determines if the given character is "safe" +type SafeCharacterSet interface { + IsSafe(c byte) bool +} + +// Range implements SafeCharacterSet +type Range struct { + From, To byte +} + +// IsSafe implements SafeCharacterSet +func (r *Range) IsSafe(c byte) bool { + return c >= r.From && c <= r.To +} + +// Char implements SafeCharacterSet +type Char struct { + Val byte +} + +// IsSafe implements SafeCharacterSet +func (ch *Char) IsSafe(c byte) bool { + return c == ch.Val +} + +// Normalize checks each character in the string against SafeSets, +// and if it's not safe substitutes it with Replacement. +func (n *SimpleNameNormalizer) Normalize(name string) string { + var retMe []byte + nameBytes := []byte(name) + for i, b := range nameBytes { + if n.safeByte(b) { + if retMe != nil { + retMe[i] = b + } + } else { + if retMe == nil { + retMe = make([]byte, len(nameBytes)) + copy(retMe[0:i], nameBytes[0:i]) + } + retMe[i] = n.Replacement + } + } + if retMe == nil { + return name + } + return string(retMe) +} + +// safeByte checks if b against all safe charsets. +func (n *SimpleNameNormalizer) safeByte(b byte) bool { + for i := range n.SafeSets { + if n.SafeSets[i].IsSafe(b) { + return true + } + } + return false +} diff --git a/rpcmetrics/normalizer_test.go b/rpcmetrics/normalizer_test.go new file mode 100644 index 00000000..0ebdacbb --- /dev/null +++ b/rpcmetrics/normalizer_test.go @@ -0,0 +1,40 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSimpleNameNormalizer(t *testing.T) { + n := &SimpleNameNormalizer{ + SafeSets: []SafeCharacterSet{ + &Range{From: 'a', To: 'z'}, + &Char{'-'}, + }, + Replacement: '-', + } + assert.Equal(t, "ab-cd", n.Normalize("ab-cd"), "all valid") + assert.Equal(t, "ab-cd", n.Normalize("ab.cd"), "single mismatch") + assert.Equal(t, "a--cd", n.Normalize("aB-cd"), "range letter mismatch") +} diff --git a/rpcmetrics/observer.go b/rpcmetrics/observer.go new file mode 100644 index 00000000..10e5dec0 --- /dev/null +++ b/rpcmetrics/observer.go @@ -0,0 +1,176 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +import ( + "strconv" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/uber/jaeger-lib/metrics" + + jaeger "github.com/uber/jaeger-client-go" +) + +const defaultMaxNumberOfEndpoints = 200 + +// Observer is an observer that can emit RPC metrics. +type Observer struct { + metricsByEndpoint *MetricsByEndpoint +} + +// NewObserver creates a new observer that can emit RPC metrics. +func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer { + return &Observer{ + metricsByEndpoint: newMetricsByEndpoint( + metricsFactory, + normalizer, + defaultMaxNumberOfEndpoints, + ), + } +} + +// OnStartSpan creates a new Observer for the span. +func (o *Observer) OnStartSpan( + operationName string, + options opentracing.StartSpanOptions, +) jaeger.SpanObserver { + return NewSpanObserver(o.metricsByEndpoint, operationName, options) +} + +// SpanKind identifies the span as inboud, outbound, or internal +type SpanKind int + +const ( + // Local span kind + Local SpanKind = iota + // Inbound span kind + Inbound + // Outbound span kind + Outbound +) + +// SpanObserver collects RPC metrics +type SpanObserver struct { + metricsByEndpoint *MetricsByEndpoint + operationName string + startTime time.Time + mux sync.Mutex + kind SpanKind + httpStatusCode uint16 + err bool +} + +// NewSpanObserver creates a new SpanObserver that can emit RPC metrics. +func NewSpanObserver( + metricsByEndpoint *MetricsByEndpoint, + operationName string, + options opentracing.StartSpanOptions, +) *SpanObserver { + so := &SpanObserver{ + metricsByEndpoint: metricsByEndpoint, + operationName: operationName, + startTime: options.StartTime, + } + for k, v := range options.Tags { + so.handleTagInLock(k, v) + } + return so +} + +// handleTags watches for special tags +// - SpanKind +// - HttpStatusCode +// - Error +func (so *SpanObserver) handleTagInLock(key string, value interface{}) { + if key == string(ext.SpanKind) { + if v, ok := value.(ext.SpanKindEnum); ok { + value = string(v) + } + if v, ok := value.(string); ok { + if v == string(ext.SpanKindRPCClientEnum) { + so.kind = Outbound + } else if v == string(ext.SpanKindRPCServerEnum) { + so.kind = Inbound + } + } + return + } + if key == string(ext.HTTPStatusCode) { + if v, ok := value.(uint16); ok { + so.httpStatusCode = v + } else if v, ok := value.(int); ok { + so.httpStatusCode = uint16(v) + } else if v, ok := value.(string); ok { + if vv, err := strconv.Atoi(v); err == nil { + so.httpStatusCode = uint16(vv) + } + } + return + } + if key == string(ext.Error) { + if v, ok := value.(bool); ok { + so.err = v + } else if v, ok := value.(string); ok { + if vv, err := strconv.ParseBool(v); err == nil { + so.err = vv + } + } + return + } +} + +// OnFinish emits the RPC metrics. It only has an effect when operation name +// is not blank, and the span kind is an RPC server. +func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) { + so.mux.Lock() + defer so.mux.Unlock() + + if so.operationName == "" || so.kind != Inbound { + return + } + + mets := so.metricsByEndpoint.get(so.operationName) + mets.Requests.Inc(1) + if so.err { + mets.Failures.Inc(1) + } else { + mets.Success.Inc(1) + } + mets.RequestLatencyMs.Record(options.FinishTime.Sub(so.startTime)) + mets.recordHTTPStatusCode(so.httpStatusCode) +} + +// OnSetOperationName records new operation name. +func (so *SpanObserver) OnSetOperationName(operationName string) { + so.mux.Lock() + so.operationName = operationName + so.mux.Unlock() +} + +// OnSetTag implements SpanObserver +func (so *SpanObserver) OnSetTag(key string, value interface{}) { + so.mux.Lock() + so.handleTagInLock(key, value) + so.mux.Unlock() +} diff --git a/rpcmetrics/observer_test.go b/rpcmetrics/observer_test.go new file mode 100644 index 00000000..9c2d6c66 --- /dev/null +++ b/rpcmetrics/observer_test.go @@ -0,0 +1,186 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpcmetrics + +import ( + "fmt" + "testing" + "time" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics" + u "github.com/uber/jaeger-lib/metrics/testutils" + + "github.com/opentracing/opentracing-go/ext" + jaeger "github.com/uber/jaeger-client-go" +) + +func ExampleObserver() { + metricsFactory := metrics.NewLocalFactory(0) + metricsObserver := NewObserver( + metricsFactory, + DefaultNameNormalizer, + ) + tracer, closer := jaeger.NewTracer( + "serviceName", + jaeger.NewConstSampler(true), + jaeger.NewInMemoryReporter(), + jaeger.TracerOptions.Observer(metricsObserver), + ) + defer closer.Close() + + span := tracer.StartSpan("test", ext.SpanKindRPCServer) + span.Finish() + + c, _ := metricsFactory.Snapshot() + fmt.Printf("requests: %d\n", c["requests|endpoint=test"]) + fmt.Printf("success: %d\n", c["success|endpoint=test"]) + fmt.Printf("errors: %d\n", c["errors|endpoint=test"]) + // Output: + // requests: 1 + // success: 1 + // errors: 0 +} + +type testTracer struct { + metrics *metrics.LocalFactory + tracer opentracing.Tracer +} + +func withTestTracer(runTest func(tt *testTracer)) { + sampler := jaeger.NewConstSampler(true) + reporter := jaeger.NewInMemoryReporter() + metrics := metrics.NewLocalFactory(time.Minute) + observer := NewObserver(metrics, DefaultNameNormalizer) + tracer, closer := jaeger.NewTracer( + "test", + sampler, + reporter, + jaeger.TracerOptions.Observer(observer)) + defer closer.Close() + runTest(&testTracer{ + metrics: metrics, + tracer: tracer, + }) +} + +func TestObserver(t *testing.T) { + withTestTracer(func(testTracer *testTracer) { + ts := time.Now() + finishOptions := opentracing.FinishOptions{ + FinishTime: ts.Add(50 * time.Millisecond), + } + + testCases := []struct { + name string + tag opentracing.Tag + opNameOverride string + }{ + {name: "local-span", tag: opentracing.Tag{Key: "x", Value: "y"}}, + {name: "get-user", tag: ext.SpanKindRPCServer}, + {name: "get-user", tag: ext.SpanKindRPCServer, opNameOverride: "get-user-override"}, + {name: "get-user-client", tag: ext.SpanKindRPCClient}, + } + + for _, testCase := range testCases { + span := testTracer.tracer.StartSpan( + testCase.name, + testCase.tag, + opentracing.StartTime(ts), + ) + if testCase.opNameOverride != "" { + span.SetOperationName(testCase.opNameOverride) + } + span.FinishWithOptions(finishOptions) + } + + u.AssertCounterMetrics(t, + testTracer.metrics, + u.ExpectedMetric{Name: "requests", Tags: endpointTags("local-span"), Value: 0}, + u.ExpectedMetric{Name: "success", Tags: endpointTags("local-span"), Value: 0}, + u.ExpectedMetric{Name: "requests", Tags: endpointTags("get-user"), Value: 1}, + u.ExpectedMetric{Name: "success", Tags: endpointTags("get-user"), Value: 1}, + u.ExpectedMetric{Name: "requests", Tags: endpointTags("get-user-override"), Value: 1}, + u.ExpectedMetric{Name: "success", Tags: endpointTags("get-user-override"), Value: 1}, + u.ExpectedMetric{Name: "requests", Tags: endpointTags("get-user-client"), Value: 0}, + u.ExpectedMetric{Name: "success", Tags: endpointTags("get-user-client"), Value: 0}, + ) + // TODO something wrong with string generation, .P99 should not be appended to the tag + // as a result we cannot use u.AssertGaugeMetrics + _, g := testTracer.metrics.Snapshot() + assert.EqualValues(t, 51, g["request_latency_ms|endpoint=get-user.P99"]) + }) +} + +func TestTags(t *testing.T) { + type tagTestCase struct { + key string + value interface{} + metrics []u.ExpectedMetric + } + + testCases := []tagTestCase{ + {key: "something", value: 42, metrics: []u.ExpectedMetric{ + {Name: "success", Value: 1}, + {Name: "requests", Value: 1}, + }}, + {key: "error", value: true, metrics: []u.ExpectedMetric{ + {Name: "failures", Value: 1}, + {Name: "requests", Value: 1}, + }}, + {key: "error", value: "true", metrics: []u.ExpectedMetric{ + {Name: "failures", Value: 1}, + {Name: "requests", Value: 1}, + }}, + } + + for i := 2; i <= 5; i++ { + values := []interface{}{ + i * 100, + uint16(i * 100), + fmt.Sprintf("%d00", i), + } + for _, v := range values { + testCases = append(testCases, tagTestCase{ + key: "http.status_code", value: v, metrics: []u.ExpectedMetric{ + {Name: "requests", Value: 1}, + {Name: fmt.Sprintf("http_status_code_%dxx", i), Value: 1}, + }, + }) + } + } + + for _, tc := range testCases { + testCase := tc // capture loop var + for i := range testCase.metrics { + testCase.metrics[i].Tags = endpointTags("span") + } + t.Run(fmt.Sprintf("%s-%v", testCase.key, testCase.value), func(t *testing.T) { + withTestTracer(func(testTracer *testTracer) { + span := testTracer.tracer.StartSpan("span", ext.SpanKindRPCServer) + span.SetTag(testCase.key, testCase.value) + span.Finish() + u.AssertCounterMetrics(t, testTracer.metrics, testCase.metrics...) + }) + }) + } +}