Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Add RPC metrics emitter as Observer/SpanObserver (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Mar 3, 2017
1 parent 1f64068 commit fc18ec3
Show file tree
Hide file tree
Showing 12 changed files with 846 additions and 2 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ sudo: required
language: go

go:
- 1.6
- 1.7

services:
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ Changes by Version
2.2.0 (Unreleased)
------------------

- Introduce Observer and SpanObserver
- Introduce Observer and SpanObserver (https://github.com/uber/jaeger-client-go/pull/94)
- Add RPC metrics emitter as Observer/SpanObserver (https://github.com/uber/jaeger-client-go/pull/103)


2.1.2 (2016-02-27)
Expand Down
5 changes: 5 additions & 0 deletions rpcmetrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
An Observer that can be used to emit RPC metrics
================================================

It can be attached to the tracer during tracer construction.
See `ExampleObserver` function in [observer_test.go](./observer_test.go).
22 changes: 22 additions & 0 deletions rpcmetrics/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package rpcmetrics implements an Observer that can be used to emit RPC metrics.
package rpcmetrics
69 changes: 69 additions & 0 deletions rpcmetrics/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpcmetrics

import "sync"

// normalizedEndpoints is a cache for endpointName -> safeName mappings.
type normalizedEndpoints struct {
names map[string]string
maxSize int
defaultName string
normalizer NameNormalizer
mux sync.RWMutex
}

func newNormalizedEndpoints(maxSize int, normalizer NameNormalizer) *normalizedEndpoints {
return &normalizedEndpoints{
maxSize: maxSize,
normalizer: normalizer,
names: make(map[string]string, maxSize),
}
}

// normalize looks up the name in the cache, if not found it uses normalizer
// to convert the name to a safe name. If called with more than maxSize unique
// names it returns "" for all other names beyond those already cached.
func (n *normalizedEndpoints) normalize(name string) string {
n.mux.RLock()
norm, ok := n.names[name]
l := len(n.names)
n.mux.RUnlock()
if ok {
return norm
}
if l >= n.maxSize {
return ""
}
return n.normalizeWithLock(name)
}

func (n *normalizedEndpoints) normalizeWithLock(name string) string {
norm := n.normalizer.Normalize(name)
n.mux.Lock()
defer n.mux.Unlock()
// cache may have grown while we were not holding the lock
if len(n.names) >= n.maxSize {
return ""
}
n.names[name] = norm
return norm
}
49 changes: 49 additions & 0 deletions rpcmetrics/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpcmetrics

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNormalizedEndpoints(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)

assertLen := func(l int) {
n.mux.RLock()
defer n.mux.RUnlock()
assert.Len(t, n.names, l)
}

assert.Equal(t, "ab-cd", n.normalize("ab^cd"), "one translation")
assert.Equal(t, "ab-cd", n.normalize("ab^cd"), "cache hit")
assertLen(1)
assert.Equal(t, "", n.normalize("xys"), "cache overflow")
assertLen(1)
}

func TestNormalizedEndpointsDoubleLocking(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)
assert.Equal(t, "ab-cd", n.normalize("ab^cd"), "fill out the cache")
assert.Equal(t, "", n.normalizeWithLock("xys"), "cache overflow")
}
130 changes: 130 additions & 0 deletions rpcmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpcmetrics

import (
"sync"

"github.com/uber/jaeger-lib/metrics"
)

const (
otherEndpointsPlaceholder = "other"
endpointNameMetricTag = "endpoint"
)

// Metrics is a collection of metrics for an endpoint describing
// throughput, success, errors, and performance.
type Metrics struct {
// Requests is a counter of the total number of successes + failures.
Requests metrics.Counter `metric:"requests"`

// Success is a counter of the total number of successes.
Success metrics.Counter `metric:"success"`

// Failures is a counter of the number of times any failure has been observed.
Failures metrics.Counter `metric:"failures"`

// RequestLatencyMs is a histogram of the latency of requests in milliseconds.
RequestLatencyMs metrics.Timer `metric:"request_latency_ms"`

// HTTPStatusCode2xx is a counter of the total number of requests with HTTP status code 200-299
HTTPStatusCode2xx metrics.Counter `metric:"http_status_code_2xx"`

// HTTPStatusCode3xx is a counter of the total number of requests with HTTP status code 300-399
HTTPStatusCode3xx metrics.Counter `metric:"http_status_code_3xx"`

// HTTPStatusCode4xx is a counter of the total number of requests with HTTP status code 400-499
HTTPStatusCode4xx metrics.Counter `metric:"http_status_code_4xx"`

// HTTPStatusCode5xx is a counter of the total number of requests with HTTP status code 500-599
HTTPStatusCode5xx metrics.Counter `metric:"http_status_code_5xx"`
}

func (m *Metrics) recordHTTPStatusCode(statusCode uint16) {
if statusCode >= 200 && statusCode < 300 {
m.HTTPStatusCode2xx.Inc(1)
} else if statusCode >= 300 && statusCode < 400 {
m.HTTPStatusCode3xx.Inc(1)
} else if statusCode >= 400 && statusCode < 500 {
m.HTTPStatusCode4xx.Inc(1)
} else if statusCode >= 500 && statusCode < 600 {
m.HTTPStatusCode5xx.Inc(1)
}
}

// MetricsByEndpoint is a registry/cache of metrics for each unique endpoint name.
// Only maxNumberOfEndpoints Metrics are stored, all other endpoint names are mapped
// to a generic endpoint name "other".
type MetricsByEndpoint struct {
metricsFactory metrics.Factory
endpoints *normalizedEndpoints
metricsByEndpoint map[string]*Metrics
mux sync.RWMutex
}

func newMetricsByEndpoint(
metricsFactory metrics.Factory,
normalizer NameNormalizer,
maxNumberOfEndpoints int,
) *MetricsByEndpoint {
return &MetricsByEndpoint{
metricsFactory: metricsFactory,
endpoints: newNormalizedEndpoints(maxNumberOfEndpoints, normalizer),
metricsByEndpoint: make(map[string]*Metrics, maxNumberOfEndpoints+1), // +1 for "other"
}
}

func (m *MetricsByEndpoint) get(endpoint string) *Metrics {
safeName := m.endpoints.normalize(endpoint)
if safeName == "" {
safeName = otherEndpointsPlaceholder
}
m.mux.RLock()
met := m.metricsByEndpoint[safeName]
m.mux.RUnlock()
if met != nil {
return met
}

return m.getWithWriteLock(safeName)
}

// split to make easier to test
func (m *MetricsByEndpoint) getWithWriteLock(safeName string) *Metrics {
m.mux.Lock()
defer m.mux.Unlock()

// it is possible that the name has been already registered after we released
// the read lock and before we grabbed the write lock, so check for that.
if met, ok := m.metricsByEndpoint[safeName]; ok {
return met
}

// it would be nice to create the struct before locking, since Init() is somewhat
// expensive, however some metrics backends (e.g. expvar) may not like duplicate metrics.
met := &Metrics{}
tags := map[string]string{endpointNameMetricTag: safeName}
metrics.Init(met, m.metricsFactory, tags)

m.metricsByEndpoint[safeName] = met
return met
}
60 changes: 60 additions & 0 deletions rpcmetrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpcmetrics

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/testutils"
)

func endpointTags(endpoint string) map[string]string {
return map[string]string{
"endpoint": endpoint,
}
}

func TestMetricsByEndpoint(t *testing.T) {
met := metrics.NewLocalFactory(0)
mbe := newMetricsByEndpoint(met, DefaultNameNormalizer, 2)

m1 := mbe.get("abc1")
m2 := mbe.get("abc1") // from cache
m2a := mbe.getWithWriteLock("abc1") // from cache in double-checked lock
assert.Equal(t, m1, m2)
assert.Equal(t, m1, m2a)

m3 := mbe.get("abc3")
m4 := mbe.get("overflow")
m5 := mbe.get("overflow2")

for _, m := range []*Metrics{m1, m2, m2a, m3, m4, m5} {
m.Requests.Inc(1)
}

testutils.AssertCounterMetrics(t, met,
testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("abc1"), Value: 3},
testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("abc3"), Value: 1},
testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("other"), Value: 2},
)
}
Loading

0 comments on commit fc18ec3

Please sign in to comment.