From de5e73a2f4954152d4198bf01099c5d04ba04850 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Thu, 13 Jun 2024 09:46:29 +0800 Subject: [PATCH] metrics, balance: add metrics for load balance (#556) --- lib/config/getter.go | 8 + pkg/balance/factor/factor_balance.go | 9 +- pkg/balance/factor/factor_balance_test.go | 7 +- pkg/balance/factor/factor_location.go | 15 +- pkg/balance/factor/factor_location_test.go | 105 +--------- pkg/balance/factor/mock_test.go | 5 + pkg/balance/metricsreader/mock_test.go | 4 + pkg/balance/observer/backend_health.go | 31 ++- pkg/balance/observer/backend_observer.go | 8 +- pkg/balance/observer/backend_observer_test.go | 71 ++++++- pkg/balance/observer/mock_test.go | 28 ++- pkg/balance/policy/balance_policy.go | 3 +- pkg/balance/policy/mock_test.go | 4 + pkg/balance/policy/simple_policy.go | 10 +- pkg/balance/policy/simple_policy_test.go | 5 +- pkg/balance/router/metrics.go | 14 +- pkg/balance/router/mock_test.go | 6 +- pkg/balance/router/router.go | 10 + pkg/balance/router/router_score.go | 12 +- pkg/balance/router/router_score_test.go | 4 +- pkg/balance/router/router_static.go | 4 + pkg/manager/namespace/manager.go | 2 +- pkg/metrics/balance.go | 3 +- pkg/metrics/grafana/tiproxy_summary.json | 190 +++++++++++++++++- pkg/metrics/grafana/tiproxy_summary.jsonnet | 30 +++ pkg/metrics/traffic.go | 8 + pkg/proxy/backend/backend_conn_mgr.go | 2 +- pkg/proxy/backend/backend_conn_mgr_test.go | 33 ++- pkg/proxy/backend/metrics.go | 5 +- 29 files changed, 482 insertions(+), 154 deletions(-) create mode 100644 lib/config/getter.go diff --git a/lib/config/getter.go b/lib/config/getter.go new file mode 100644 index 00000000..055ece99 --- /dev/null +++ b/lib/config/getter.go @@ -0,0 +1,8 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package config + +type ConfigGetter interface { + GetConfig() *Config +} diff --git a/pkg/balance/factor/factor_balance.go b/pkg/balance/factor/factor_balance.go index c335690b..27eed46b 100644 --- a/pkg/balance/factor/factor_balance.go +++ b/pkg/balance/factor/factor_balance.go @@ -176,7 +176,7 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli // BackendsToBalance returns the busiest/unhealthy backend and the idlest backend. // balanceCount: the count of connections to migrate in this round. 0 indicates no need to balance. // reason: the debug information to be logged. -func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount int, reason []zap.Field) { +func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount int, reason string, logFields []zap.Field) { if len(backends) <= 1 { return } @@ -226,16 +226,17 @@ func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) ( // backend1 factor scores: 1, 0, 1 // backend2 factor scores: 0, 1, 0 // Balancing the third factor may make the second factor unbalanced, although it's in the same order with the first factor. - return nil, nil, 0, nil + return } leftBitNum -= bitNum } + reason = factor.Name() fields := []zap.Field{ - zap.String("factor", factor.Name()), + zap.String("factor", reason), zap.Uint64("from_score", maxScore), zap.Uint64("to_score", minScore), } - return busiestBackend.BackendCtx, idlestBackend.BackendCtx, balanceCount, fields + return busiestBackend.BackendCtx, idlestBackend.BackendCtx, balanceCount, reason, fields } func (fbb *FactorBasedBalance) SetConfig(cfg *config.Config) { diff --git a/pkg/balance/factor/factor_balance_test.go b/pkg/balance/factor/factor_balance_test.go index b46d8d1f..6bde36c1 100644 --- a/pkg/balance/factor/factor_balance_test.go +++ b/pkg/balance/factor/factor_balance_test.go @@ -136,11 +136,12 @@ func TestBalanceWithOneFactor(t *testing.T) { } } backends := createBackends(len(test.scores)) - from, to, count, _ := fm.BackendsToBalance(backends) + from, to, count, reason, _ := fm.BackendsToBalance(backends) require.Equal(t, test.count, count, "test index %d", tIdx) if test.count > 0 { require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx) require.Equal(t, backends[test.toIdx], to, "test index %d", tIdx) + require.Equal(t, "mock", reason, "test index %d", tIdx) } else { require.Nil(t, from, "test index %d", tIdx) require.Nil(t, to, "test index %d", tIdx) @@ -215,7 +216,7 @@ func TestBalanceWith2Factors(t *testing.T) { } } backends := createBackends(len(test.scores1)) - from, to, count, _ := fm.BackendsToBalance(backends) + from, to, count, _, _ := fm.BackendsToBalance(backends) require.Equal(t, test.count, count, "test index %d", tIdx) if test.count > 0 { require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx) @@ -266,7 +267,7 @@ func TestBalanceWith3Factors(t *testing.T) { }(factorIdx, factor) } backends := createBackends(len(test.scores)) - from, to, count, _ := fm.BackendsToBalance(backends) + from, to, count, _, _ := fm.BackendsToBalance(backends) require.Equal(t, test.count, count, "test index %d", tIdx) if test.count > 0 { require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx) diff --git a/pkg/balance/factor/factor_location.go b/pkg/balance/factor/factor_location.go index 8beb90ed..0150c35a 100644 --- a/pkg/balance/factor/factor_location.go +++ b/pkg/balance/factor/factor_location.go @@ -6,9 +6,6 @@ package factor import "github.com/pingcap/tiproxy/lib/config" const ( - // locationLabelName indicates the label name that location-based balance should be based on. - // We use `zone` because the follower read in TiDB also uses `zone` to decide location. - locationLabelName = "zone" // balanceCount4Location indicates how many connections to balance per second. balanceCount4Location = 1 ) @@ -16,9 +13,7 @@ const ( var _ Factor = (*FactorLabel)(nil) type FactorLocation struct { - // The location of this tiproxy instance. - selfLocation string - bitNum int + bitNum int } func NewFactorLocation() *FactorLocation { @@ -32,13 +27,12 @@ func (fl *FactorLocation) Name() string { } func (fl *FactorLocation) UpdateScore(backends []scoredBackend) { - if len(fl.selfLocation) == 0 || len(backends) <= 1 { + if len(backends) <= 1 { return } for i := 0; i < len(backends); i++ { score := 1 - backendLabels := backends[i].GetBackendInfo().Labels - if backendLabels != nil && backendLabels[locationLabelName] == fl.selfLocation { + if backends[i].Local() { score = 0 } backends[i].addScore(score, fl.bitNum) @@ -54,9 +48,6 @@ func (fl *FactorLocation) BalanceCount(from, to scoredBackend) int { } func (fl *FactorLocation) SetConfig(cfg *config.Config) { - if cfg.Labels != nil { - fl.selfLocation = cfg.Labels[locationLabelName] - } } func (fl *FactorLocation) Close() { diff --git a/pkg/balance/factor/factor_location_test.go b/pkg/balance/factor/factor_location_test.go index 69216fa3..c08e73e8 100644 --- a/pkg/balance/factor/factor_location_test.go +++ b/pkg/balance/factor/factor_location_test.go @@ -6,120 +6,33 @@ package factor import ( "testing" - "github.com/pingcap/tiproxy/lib/config" - "github.com/pingcap/tiproxy/pkg/balance/observer" "github.com/stretchr/testify/require" ) -func TestFactorLocationOneBackend(t *testing.T) { +func TestFactorLocationScore(t *testing.T) { tests := []struct { - selfLocation string - backendLocation string - expectedScore uint64 - }{ - {}, - { - selfLocation: "az1", - expectedScore: 1, - }, - { - backendLocation: "az1", - }, - { - selfLocation: "az1", - backendLocation: "az2", - expectedScore: 1, - }, - { - selfLocation: "az1", - backendLocation: "az1", - }, - } - - factor := NewFactorLocation() - for i, test := range tests { - var backendLabels map[string]string - if test.backendLocation != "" { - backendLabels = map[string]string{ - locationLabelName: test.backendLocation, - } - } - backendCtx := &mockBackend{ - BackendInfo: observer.BackendInfo{Labels: backendLabels}, - } - // Create 2 backends so that UpdateScore won't skip calculating scores. - backends := []scoredBackend{ - { - BackendCtx: backendCtx, - }, - { - BackendCtx: backendCtx, - }, - } - var selfLabels map[string]string - if test.selfLocation != "" { - selfLabels = map[string]string{ - locationLabelName: test.selfLocation, - } - } - factor.SetConfig(&config.Config{ - Labels: selfLabels, - }) - factor.UpdateScore(backends) - for _, backend := range backends { - require.Equal(t, test.expectedScore, backend.score(), "test idx: %d", i) - } - } -} - -func TestFactorLocationMultiBackends(t *testing.T) { - tests := []struct { - labels map[string]string + local bool expectedScore uint64 }{ { + local: false, expectedScore: 1, }, { - labels: map[string]string{ - locationLabelName: "az1", - }, - expectedScore: 0, - }, - { - labels: map[string]string{ - "z": "az1", - }, - expectedScore: 1, - }, - { - labels: map[string]string{ - locationLabelName: "az2", - "z": "az1", - }, - expectedScore: 1, - }, - { - labels: map[string]string{ - locationLabelName: "az1", - "z": "az2", - }, + local: true, expectedScore: 0, }, } + + factor := NewFactorLocation() backends := make([]scoredBackend, 0, len(tests)) for _, test := range tests { - backend := scoredBackend{ + backends = append(backends, scoredBackend{ BackendCtx: &mockBackend{ - BackendInfo: observer.BackendInfo{Labels: test.labels}, + local: test.local, }, - } - backends = append(backends, backend) + }) } - factor := NewFactorLocation() - factor.SetConfig(&config.Config{ - Labels: map[string]string{locationLabelName: "az1"}, - }) factor.UpdateScore(backends) for i, test := range tests { require.Equal(t, test.expectedScore, backends[i].score(), "test idx: %d", i) diff --git a/pkg/balance/factor/mock_test.go b/pkg/balance/factor/mock_test.go index 1e64137e..083554b2 100644 --- a/pkg/balance/factor/mock_test.go +++ b/pkg/balance/factor/mock_test.go @@ -24,6 +24,7 @@ type mockBackend struct { connScore int connCount int healthy bool + local bool } func newMockBackend(healthy bool, connScore int) *mockBackend { @@ -53,6 +54,10 @@ func (mb *mockBackend) GetBackendInfo() observer.BackendInfo { return mb.BackendInfo } +func (mb *mockBackend) Local() bool { + return mb.local +} + var _ Factor = (*mockFactor)(nil) type mockFactor struct { diff --git a/pkg/balance/metricsreader/mock_test.go b/pkg/balance/metricsreader/mock_test.go index 658693e9..668d92b4 100644 --- a/pkg/balance/metricsreader/mock_test.go +++ b/pkg/balance/metricsreader/mock_test.go @@ -118,3 +118,7 @@ func (mb *mockBackend) Addr() string { func (mb *mockBackend) GetBackendInfo() observer.BackendInfo { return mb.BackendInfo } + +func (mb *mockBackend) Local() bool { + return true +} diff --git a/pkg/balance/observer/backend_health.go b/pkg/balance/observer/backend_health.go index 92605f7c..214f0ece 100644 --- a/pkg/balance/observer/backend_health.go +++ b/pkg/balance/observer/backend_health.go @@ -3,7 +3,17 @@ package observer -import "fmt" +import ( + "fmt" + + "github.com/pingcap/tiproxy/lib/config" +) + +var ( + // locationLabelName indicates the label name that decides the location of TiProxy and backends. + // We use `zone` because the follower read in TiDB also uses `zone` to decide location. + locationLabelName = "zone" +) type BackendHealth struct { BackendInfo @@ -12,6 +22,25 @@ type BackendHealth struct { PingErr error // The backend version that returned to the client during handshake. ServerVersion string + // Whether the backend in the same zone with TiProxy. If TiProxy location is undefined, take all backends as local. + Local bool +} + +func (bh *BackendHealth) setLocal(cfg *config.Config) { + if cfg.Labels == nil { + bh.Local = true + return + } + selfLocation, ok := cfg.Labels[locationLabelName] + if !ok || len(selfLocation) == 0 { + bh.Local = true + return + } + if bh.Labels != nil && bh.Labels[locationLabelName] == selfLocation { + bh.Local = true + return + } + bh.Local = false } func (bh *BackendHealth) Equals(health BackendHealth) bool { diff --git a/pkg/balance/observer/backend_observer.go b/pkg/balance/observer/backend_observer.go index d9dd0b5d..f33fab08 100644 --- a/pkg/balance/observer/backend_observer.go +++ b/pkg/balance/observer/backend_observer.go @@ -41,6 +41,7 @@ type DefaultBackendObserver struct { refreshChan chan struct{} fetcher BackendFetcher hc HealthCheck + cfgGetter config.ConfigGetter cancelFunc context.CancelFunc logger *zap.Logger healthCheckConfig *config.HealthCheck @@ -48,8 +49,8 @@ type DefaultBackendObserver struct { } // NewDefaultBackendObserver creates a BackendObserver. -func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck, - backendFetcher BackendFetcher, hc HealthCheck) *DefaultBackendObserver { +func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck, backendFetcher BackendFetcher, hc HealthCheck, + cfgGetter config.ConfigGetter) *DefaultBackendObserver { config.Check() bo := &DefaultBackendObserver{ logger: logger, @@ -60,6 +61,7 @@ func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck, fetcher: backendFetcher, subscribers: make(map[string]chan HealthResult), curBackends: make(map[string]*BackendHealth), + cfgGetter: cfgGetter, } return bo } @@ -126,6 +128,7 @@ func (bo *DefaultBackendObserver) checkHealth(ctx context.Context, backends map[ // Each goroutine checks one backend. var lock sync.Mutex + cfg := bo.cfgGetter.GetConfig() for addr, info := range backends { func(addr string, info *BackendInfo) { bo.wgp.RunWithRecover(func() { @@ -133,6 +136,7 @@ func (bo *DefaultBackendObserver) checkHealth(ctx context.Context, backends map[ return } health := bo.hc.Check(ctx, addr, info) + health.setLocal(cfg) lock.Lock() curBackendHealth[addr] = health lock.Unlock() diff --git a/pkg/balance/observer/backend_observer_test.go b/pkg/balance/observer/backend_observer_test.go index ae0926c3..7cd96e21 100644 --- a/pkg/balance/observer/backend_observer_test.go +++ b/pkg/balance/observer/backend_observer_test.go @@ -148,6 +148,71 @@ func TestMultiSubscribers(t *testing.T) { } } +func TestLocal(t *testing.T) { + ts := newObserverTestSuite(t) + cfgGetter := ts.bo.cfgGetter.(*mockConfigGetter) + t.Cleanup(ts.close) + ts.bo.Start(context.Background()) + + tests := []struct { + selfLabels map[string]string + backendLabels map[string]string + local bool + }{ + { + selfLabels: nil, + backendLabels: nil, + local: true, + }, + { + selfLabels: nil, + backendLabels: map[string]string{"a": "b"}, + local: true, + }, + { + selfLabels: map[string]string{"a": "b"}, + backendLabels: map[string]string{"a": "b"}, + local: true, + }, + { + selfLabels: map[string]string{locationLabelName: "b"}, + backendLabels: map[string]string{"a": "b"}, + local: false, + }, + { + selfLabels: map[string]string{locationLabelName: "b"}, + backendLabels: map[string]string{locationLabelName: "c"}, + local: false, + }, + { + selfLabels: map[string]string{locationLabelName: "b"}, + backendLabels: map[string]string{locationLabelName: "c", "a": "c"}, + local: false, + }, + { + selfLabels: map[string]string{locationLabelName: "b"}, + backendLabels: map[string]string{locationLabelName: "b", "a": "c"}, + local: true, + }, + } + + for i, test := range tests { + backend, _ := ts.addBackend() + ts.setLabels(backend, test.backendLabels) + cfg := &config.Config{ + Labels: test.selfLabels, + } + cfgGetter.setConfig(cfg) + require.Eventually(ts.t, func() bool { + result := ts.getResultFromCh() + require.NoError(ts.t, result.Error()) + health := result.Backends()[backend] + return health.Local == test.local + }, 3*time.Second, 10*time.Millisecond, "test case %d", i) + ts.removeBackend(backend) + } +} + type observerTestSuite struct { t *testing.T bo *DefaultBackendObserver @@ -161,7 +226,7 @@ func newObserverTestSuite(t *testing.T) *observerTestSuite { fetcher := newMockBackendFetcher() hc := newMockHealthCheck() lg, _ := logger.CreateLoggerForTest(t) - bo := NewDefaultBackendObserver(lg, newHealthCheckConfigForTest(), fetcher, hc) + bo := NewDefaultBackendObserver(lg, newHealthCheckConfigForTest(), fetcher, hc, newMockConfigGetter(&config.Config{})) subscriber := bo.Subscribe("receiver") return &observerTestSuite{ t: t, @@ -229,6 +294,10 @@ func (ts *observerTestSuite) setHealth(addr string, healthy bool) { ts.hc.setHealth(addr, healthy) } +func (ts *observerTestSuite) setLabels(addr string, labels map[string]string) { + ts.fetcher.setLabels(addr, labels) +} + func (ts *observerTestSuite) removeBackend(addr string) { ts.fetcher.removeBackend(addr) ts.hc.removeBackend(addr) diff --git a/pkg/balance/observer/mock_test.go b/pkg/balance/observer/mock_test.go index 1c3761f7..a2cdeba1 100644 --- a/pkg/balance/observer/mock_test.go +++ b/pkg/balance/observer/mock_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/pkg/manager/infosync" "go.uber.org/atomic" ) @@ -57,6 +58,12 @@ func (mbf *mockBackendFetcher) setBackend(addr string, info *BackendInfo) { mbf.backends[addr] = info } +func (mbf *mockBackendFetcher) setLabels(addr string, labels map[string]string) { + mbf.Lock() + defer mbf.Unlock() + mbf.backends[addr].Labels = labels +} + func (mbf *mockBackendFetcher) removeBackend(addr string) { mbf.Lock() defer mbf.Unlock() @@ -74,9 +81,10 @@ func newMockHealthCheck() *mockHealthCheck { } } -func (mhc *mockHealthCheck) Check(_ context.Context, addr string, _ *BackendInfo) *BackendHealth { +func (mhc *mockHealthCheck) Check(_ context.Context, addr string, info *BackendInfo) *BackendHealth { mhc.Lock() defer mhc.Unlock() + mhc.backends[addr].BackendInfo = *info return mhc.backends[addr] } @@ -131,3 +139,21 @@ func (handler *mockHttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request w.WriteHeader(http.StatusInternalServerError) } } + +type mockConfigGetter struct { + cfg atomic.Pointer[config.Config] +} + +func (cfgGetter *mockConfigGetter) GetConfig() *config.Config { + return cfgGetter.cfg.Load() +} + +func (cfgGetter *mockConfigGetter) setConfig(cfg *config.Config) { + cfgGetter.cfg.Store(cfg) +} + +func newMockConfigGetter(cfg *config.Config) *mockConfigGetter { + cfgGetter := &mockConfigGetter{} + cfgGetter.setConfig(cfg) + return cfgGetter +} diff --git a/pkg/balance/policy/balance_policy.go b/pkg/balance/policy/balance_policy.go index 8a2e524b..3db21fad 100644 --- a/pkg/balance/policy/balance_policy.go +++ b/pkg/balance/policy/balance_policy.go @@ -13,7 +13,7 @@ type BalancePolicy interface { Init(cfg *config.Config) BackendToRoute(backends []BackendCtx) BackendCtx // balanceCount is the count of connections to balance per second. - BackendsToBalance(backends []BackendCtx) (from, to BackendCtx, balanceCount int, reason []zap.Field) + BackendsToBalance(backends []BackendCtx) (from, to BackendCtx, balanceCount int, reason string, logFields []zap.Field) SetConfig(cfg *config.Config) } @@ -24,5 +24,6 @@ type BackendCtx interface { // ConnScore = current connections + incoming connections - outgoing connections. ConnScore() int Healthy() bool + Local() bool GetBackendInfo() observer.BackendInfo } diff --git a/pkg/balance/policy/mock_test.go b/pkg/balance/policy/mock_test.go index a67be22e..61105d3a 100644 --- a/pkg/balance/policy/mock_test.go +++ b/pkg/balance/policy/mock_test.go @@ -35,6 +35,10 @@ func (mb *mockBackend) Addr() string { return "" } +func (mb *mockBackend) Local() bool { + return true +} + func (mb *mockBackend) GetBackendInfo() observer.BackendInfo { return observer.BackendInfo{} } diff --git a/pkg/balance/policy/simple_policy.go b/pkg/balance/policy/simple_policy.go index 691c0423..bc3e68cb 100644 --- a/pkg/balance/policy/simple_policy.go +++ b/pkg/balance/policy/simple_policy.go @@ -44,24 +44,26 @@ func (sbp *SimpleBalancePolicy) BackendToRoute(backends []BackendCtx) BackendCtx return nil } -func (sbp *SimpleBalancePolicy) BackendsToBalance(backends []BackendCtx) (from, to BackendCtx, balanceCount int, reason []zap.Field) { +func (sbp *SimpleBalancePolicy) BackendsToBalance(backends []BackendCtx) (from, to BackendCtx, balanceCount int, reason string, logFields []zap.Field) { if len(backends) <= 1 { return } sortBackends(backends) from, to = backends[len(backends)-1], backends[0] if !to.Healthy() || from.ConnScore() <= 0 { - return nil, nil, 0, nil + return nil, nil, 0, "", nil } if !from.Healthy() { balanceCount = BalanceCount4Health + reason = "status" } else { if float64(from.ConnScore()) <= float64(to.ConnScore()+1)*ConnBalancedRatio { - return nil, nil, 0, nil + return nil, nil, 0, "", nil } balanceCount = 1 + reason = "conn" } - reason = []zap.Field{ + logFields = []zap.Field{ zap.Bool("from_healthy", from.Healthy()), zap.Bool("to_healthy", to.Healthy()), zap.Int("from_score", from.ConnScore()), diff --git a/pkg/balance/policy/simple_policy_test.go b/pkg/balance/policy/simple_policy_test.go index 874276bd..085ff1fd 100644 --- a/pkg/balance/policy/simple_policy_test.go +++ b/pkg/balance/policy/simple_policy_test.go @@ -15,6 +15,7 @@ func TestSimplePolicy(t *testing.T) { routeIdx int fromIdx int toIdx int + reason string }{ { backends: []BackendCtx{newMockBackend(false, 0)}, @@ -45,6 +46,7 @@ func TestSimplePolicy(t *testing.T) { routeIdx: 1, fromIdx: 0, toIdx: 1, + reason: "conn", }, { backends: []BackendCtx{newMockBackend(true, 0), newMockBackend(true, 0)}, @@ -79,12 +81,13 @@ func TestSimplePolicy(t *testing.T) { } else { require.Nil(t, backend) } - from, to, _, _ := sbp.BackendsToBalance(test.backends) + from, to, _, reason, _ := sbp.BackendsToBalance(test.backends) if test.fromIdx >= 0 { require.Equal(t, fromBackend.healthy, from.Healthy(), "test idx: %d", idx) require.Equal(t, fromBackend.connScore, from.ConnScore(), "test idx: %d", idx) require.Equal(t, toBackend.healthy, to.Healthy(), "test idx: %d", idx) require.Equal(t, toBackend.connScore, to.ConnScore(), "test idx: %d", idx) + require.Equal(t, test.reason, reason, "test idx: %d", idx) } else { require.Nil(t, from, "test idx: %d", idx) require.Nil(t, to, "test idx: %d", idx) diff --git a/pkg/balance/router/metrics.go b/pkg/balance/router/metrics.go index fadb012f..0f640691 100644 --- a/pkg/balance/router/metrics.go +++ b/pkg/balance/router/metrics.go @@ -24,14 +24,22 @@ func succeedToLabel(succeed bool) string { return "fail" } -func addMigrateMetrics(from, to string, succeed bool, startTime monotime.Time) { +func addMigrateMetrics(from, to, reason string, succeed bool, startTime monotime.Time) { resLabel := succeedToLabel(succeed) - metrics.MigrateCounter.WithLabelValues(from, to, resLabel).Inc() + metrics.MigrateCounter.WithLabelValues(from, to, reason, resLabel).Inc() cost := monotime.Since(startTime) metrics.MigrateDurationHistogram.WithLabelValues(from, to, resLabel).Observe(cost.Seconds()) } func readMigrateCounter(from, to string, succeed bool) (int, error) { - return metrics.ReadCounter(metrics.MigrateCounter.WithLabelValues(from, to, succeedToLabel(succeed))) + v1, err := metrics.ReadCounter(metrics.MigrateCounter.WithLabelValues(from, to, "status", succeedToLabel(succeed))) + if err != nil { + return v1, err + } + v2, err := metrics.ReadCounter(metrics.MigrateCounter.WithLabelValues(from, to, "conn", succeedToLabel(succeed))) + if err != nil { + return v2, err + } + return v1 + v2, nil } diff --git a/pkg/balance/router/mock_test.go b/pkg/balance/router/mock_test.go index eabdbdd7..4c8d6311 100644 --- a/pkg/balance/router/mock_test.go +++ b/pkg/balance/router/mock_test.go @@ -180,7 +180,7 @@ var _ policy.BalancePolicy = (*mockBalancePolicy)(nil) type mockBalancePolicy struct { cfg atomic.Pointer[config.Config] - backendsToBalance func([]policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount int, reason []zapcore.Field) + backendsToBalance func([]policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount int, reason string, logFields []zapcore.Field) backendToRoute func([]policy.BackendCtx) policy.BackendCtx } @@ -195,11 +195,11 @@ func (m *mockBalancePolicy) BackendToRoute(backends []policy.BackendCtx) policy. return nil } -func (m *mockBalancePolicy) BackendsToBalance(backends []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount int, reason []zapcore.Field) { +func (m *mockBalancePolicy) BackendsToBalance(backends []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount int, reason string, logFields []zapcore.Field) { if m.backendsToBalance != nil { return m.backendsToBalance(backends) } - return nil, nil, 0, nil + return nil, nil, 0, "", nil } func (m *mockBalancePolicy) SetConfig(cfg *config.Config) { diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index 2a868ee0..ac582dc7 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -73,6 +73,7 @@ type RedirectableConn interface { type BackendInst interface { Addr() string Healthy() bool + Local() bool } // backendWrapper contains the connections on the backend. @@ -131,6 +132,13 @@ func (b *backendWrapper) ConnCount() int { return b.connList.Len() } +func (b *backendWrapper) Local() bool { + b.mu.RLock() + local := b.mu.Local + b.mu.RUnlock() + return local +} + func (b *backendWrapper) GetBackendInfo() observer.BackendInfo { b.mu.RLock() info := b.mu.BackendInfo @@ -155,6 +163,8 @@ func (b *backendWrapper) String() string { // connWrapper wraps RedirectableConn. type connWrapper struct { RedirectableConn + // The reason why the redirection happens. + redirectReason string // Reference to the target backend if it's redirecting, otherwise nil. redirectingBackend *backendWrapper // Last redirect start time of this connection. diff --git a/pkg/balance/router/router_score.go b/pkg/balance/router/router_score.go index 93f9e0fa..7b96f301 100644 --- a/pkg/balance/router/router_score.go +++ b/pkg/balance/router/router_score.go @@ -168,6 +168,7 @@ func (router *ScoreBasedRouter) RedirectConnections() error { connWrapper := ce.Value if connWrapper.phase != phaseRedirectNotify { connWrapper.phase = phaseRedirectNotify + connWrapper.redirectReason = "test" // Ignore the results. _ = connWrapper.Redirect(backend) connWrapper.redirectingBackend = backend @@ -225,7 +226,7 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec connWrapper.phase = phaseRedirectFail } connWrapper.redirectingBackend = nil - addMigrateMetrics(from, to, succeed, connWrapper.lastRedirect) + addMigrateMetrics(from, to, connWrapper.redirectReason, succeed, connWrapper.lastRedirect) } // OnConnClosed implements ConnEventReceiver.OnConnClosed interface. @@ -320,7 +321,7 @@ func (router *ScoreBasedRouter) rebalance(ctx context.Context) { backends = append(backends, backend) } - busiestBackend, idlestBackend, balanceCount, reason := router.policy.BackendsToBalance(backends) + busiestBackend, idlestBackend, balanceCount, reason, logFields := router.policy.BackendsToBalance(backends) if balanceCount == 0 { return } @@ -361,24 +362,25 @@ func (router *ScoreBasedRouter) rebalance(ctx context.Context) { if ce == nil { break } - router.redirectConn(ce.Value, fromBackend, toBackend, reason, curTime) + router.redirectConn(ce.Value, fromBackend, toBackend, reason, logFields, curTime) router.lastRedirectTime = curTime } } func (router *ScoreBasedRouter) redirectConn(conn *connWrapper, fromBackend *backendWrapper, toBackend *backendWrapper, - reason []zap.Field, curTime monotime.Time) { + reason string, logFields []zap.Field, curTime monotime.Time) { fields := []zap.Field{ zap.Uint64("connID", conn.ConnectionID()), zap.String("from", fromBackend.addr), zap.String("to", toBackend.addr), } - fields = append(fields, reason...) + fields = append(fields, logFields...) router.logger.Debug("begin redirect connection", fields...) fromBackend.connScore-- router.removeBackendIfEmpty(fromBackend) toBackend.connScore++ conn.phase = phaseRedirectNotify + conn.redirectReason = reason conn.lastRedirect = curTime conn.Redirect(toBackend) conn.redirectingBackend = toBackend diff --git a/pkg/balance/router/router_score_test.go b/pkg/balance/router/router_score_test.go index e0701e68..18545664 100644 --- a/pkg/balance/router/router_score_test.go +++ b/pkg/balance/router/router_score_test.go @@ -858,8 +858,8 @@ func TestControlSpeed(t *testing.T) { } for _, test := range tests { - bp.backendsToBalance = func(bc []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount int, reason []zapcore.Field) { - return tester.getBackendByIndex(0), tester.getBackendByIndex(1), test.balanceCount, nil + bp.backendsToBalance = func(bc []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount int, reason string, logFields []zapcore.Field) { + return tester.getBackendByIndex(0), tester.getBackendByIndex(1), test.balanceCount, "conn", nil } tester.router.lastRedirectTime = monotime.Time(0) require.Equal(t, total, tester.getBackendByIndex(0).connScore) diff --git a/pkg/balance/router/router_static.go b/pkg/balance/router/router_static.go index 0fabee7d..d9fe4772 100644 --- a/pkg/balance/router/router_static.go +++ b/pkg/balance/router/router_static.go @@ -99,3 +99,7 @@ func (b *StaticBackend) Healthy() bool { func (b *StaticBackend) SetHealthy(healthy bool) { b.healthy.Store(healthy) } + +func (b *StaticBackend) Local() bool { + return true +} diff --git a/pkg/manager/namespace/manager.go b/pkg/manager/namespace/manager.go index 7112e1ae..7dc542eb 100644 --- a/pkg/manager/namespace/manager.go +++ b/pkg/manager/namespace/manager.go @@ -52,7 +52,7 @@ func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, // init Router rt := router.NewScoreBasedRouter(logger.Named("router")) hc := observer.NewDefaultHealthCheck(mgr.httpCli, healthCheckCfg, logger.Named("hc")) - bo := observer.NewDefaultBackendObserver(logger.Named("observer"), healthCheckCfg, fetcher, hc) + bo := observer.NewDefaultBackendObserver(logger.Named("observer"), healthCheckCfg, fetcher, hc, mgr.cfgMgr) bo.Start(context.Background()) balancePolicy := factor.NewFactorBasedBalance(logger.Named("factor"), mgr.metricsReader) rt.Init(context.Background(), bo, balancePolicy, mgr.cfgMgr.GetConfig(), mgr.cfgMgr.WatchConfig()) diff --git a/pkg/metrics/balance.go b/pkg/metrics/balance.go index 4047e94e..9581fea2 100644 --- a/pkg/metrics/balance.go +++ b/pkg/metrics/balance.go @@ -15,6 +15,7 @@ const ( LblBackend = "backend" LblFrom = "from" LblTo = "to" + LblReason = "reason" LblMigrateResult = "migrate_res" ) @@ -33,7 +34,7 @@ var ( Subsystem: LabelBalance, Name: "migrate_total", Help: "Number and result of session migration.", - }, []string{LblFrom, LblTo, LblMigrateResult}) + }, []string{LblFrom, LblTo, LblReason, LblMigrateResult}) MigrateDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/pkg/metrics/grafana/tiproxy_summary.json b/pkg/metrics/grafana/tiproxy_summary.json index a839bef8..edc2fd34 100644 --- a/pkg/metrics/grafana/tiproxy_summary.json +++ b/pkg/metrics/grafana/tiproxy_summary.json @@ -1602,6 +1602,92 @@ "show": true } ] + }, + { + "aliasColors": { }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Reasons of session migrations per minute.", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 22, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": null, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [ ], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "repeat": null, + "seriesOverrides": [ ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(increase(tiproxy_balance_migrate_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (reason)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{reason}}", + "refId": "A" + } + ], + "thresholds": [ ], + "timeFrom": null, + "timeShift": null, + "title": "Session Migration Reasons", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [ ] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] } ], "repeat": null, @@ -1621,7 +1707,7 @@ "x": 0, "y": 0 }, - "id": 22, + "id": 23, "panels": [ { "aliasColors": { }, @@ -1638,7 +1724,7 @@ "x": 0, "y": 0 }, - "id": 23, + "id": 24, "legend": { "alignAsTable": false, "avg": false, @@ -1738,7 +1824,7 @@ "x": 12, "y": 0 }, - "id": 24, + "id": 25, "legend": { "alignAsTable": false, "avg": false, @@ -1824,7 +1910,7 @@ "x": 0, "y": 0 }, - "id": 25, + "id": 26, "legend": { "alignAsTable": false, "avg": false, @@ -1913,7 +1999,7 @@ "x": 0, "y": 0 }, - "id": 26, + "id": 27, "panels": [ { "aliasColors": { }, @@ -1930,7 +2016,7 @@ "x": 0, "y": 0 }, - "id": 27, + "id": 28, "legend": { "alignAsTable": false, "avg": false, @@ -2016,7 +2102,7 @@ "x": 12, "y": 0 }, - "id": 28, + "id": 29, "legend": { "alignAsTable": false, "avg": false, @@ -2102,7 +2188,7 @@ "x": 0, "y": 0 }, - "id": 29, + "id": 30, "legend": { "alignAsTable": false, "avg": false, @@ -2188,7 +2274,7 @@ "x": 12, "y": 0 }, - "id": 30, + "id": 31, "legend": { "alignAsTable": false, "avg": false, @@ -2258,6 +2344,92 @@ "show": true } ] + }, + { + "aliasColors": { }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Bytes per second between TiProxy and cross-location backends.", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 32, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": null, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [ ], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "repeat": null, + "seriesOverrides": [ ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(tiproxy_traffic_cross_location_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [ ], + "timeFrom": null, + "timeShift": null, + "title": "Cross Location Bytes/Second", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [ ] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] } ], "repeat": null, diff --git a/pkg/metrics/grafana/tiproxy_summary.jsonnet b/pkg/metrics/grafana/tiproxy_summary.jsonnet index 4b1629fa..5e210dd9 100644 --- a/pkg/metrics/grafana/tiproxy_summary.jsonnet +++ b/pkg/metrics/grafana/tiproxy_summary.jsonnet @@ -388,6 +388,20 @@ local bMigDurP = graphPanel.new( ) ); +local bMigReasonP = graphPanel.new( + title='Session Migration Reasons', + datasource=myDS, + legend_rightSide=true, + description='Reasons of session migrations per minute.', + format='short', +) +.addTarget( + prometheus.target( + 'sum(increase(tiproxy_balance_migrate_total{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}[1m])) by (reason)', + legendFormat='{{reason}}', + ) +); + // Backend Summary local backendRow = row.new(collapse=true, title='Backend'); local bGetDurP = graphPanel.new( @@ -503,6 +517,20 @@ local outPacketsP = graphPanel.new( ) ); +local crossBytesP = graphPanel.new( + title='Cross Location Bytes/Second', + datasource=myDS, + legend_rightSide=true, + description='Bytes per second between TiProxy and cross-location backends.', + format='short', +) +.addTarget( + prometheus.target( + 'sum(rate(tiproxy_traffic_cross_location_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}[1m])) by (instance)', + legendFormat='{{instance}}', + ) +); + // Merge together. local panelW = 12; local panelH = 6; @@ -543,6 +571,7 @@ newDash .addPanel(bConnP, gridPos=leftPanelPos) .addPanel(bMigCounterP, gridPos=rightPanelPos) .addPanel(bMigDurP, gridPos=leftPanelPos) + .addPanel(bMigReasonP, gridPos=rightPanelPos) , gridPos=rowPos ) @@ -560,6 +589,7 @@ newDash .addPanel(inPacketsP, gridPos=rightPanelPos) .addPanel(outBytesP, gridPos=leftPanelPos) .addPanel(outPacketsP, gridPos=rightPanelPos) + .addPanel(crossBytesP, gridPos=leftPanelPos) , gridPos=rowPos ) diff --git a/pkg/metrics/traffic.go b/pkg/metrics/traffic.go index 8859acb7..871dc125 100644 --- a/pkg/metrics/traffic.go +++ b/pkg/metrics/traffic.go @@ -37,4 +37,12 @@ var ( Name: "outbound_packets", Help: "Counter of packets to backends.", }, []string{LblBackend}) + + CrossLocationBytesCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: ModuleProxy, + Subsystem: LabelTraffic, + Name: "cross_location_bytes", + Help: "Counter of bytes between TiProxy and cross-location backends.", + }) ) diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index 37099368..50d4df2e 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -400,7 +400,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) ( func (mgr *BackendConnManager) updateTraffic(backendIO *pnet.PacketIO) { inBytes, inPackets, outBytes, outPackets := backendIO.InBytes(), backendIO.InPackets(), backendIO.OutBytes(), backendIO.OutPackets() - addTraffic(backendIO.RemoteAddr().String(), inBytes-mgr.inBytes, inPackets-mgr.inPackets, outBytes-mgr.outBytes, outPackets-mgr.outPackets) + addTraffic(backendIO.RemoteAddr().String(), inBytes-mgr.inBytes, inPackets-mgr.inPackets, outBytes-mgr.outBytes, outPackets-mgr.outPackets, mgr.curBackend.Local()) mgr.inBytes, mgr.inPackets, mgr.outBytes, mgr.outPackets = inBytes, inPackets, outBytes, outPackets } diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index 993e32ba..862c3531 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/tiproxy/lib/util/logger" "github.com/pingcap/tiproxy/lib/util/waitgroup" "github.com/pingcap/tiproxy/pkg/balance/router" + "github.com/pingcap/tiproxy/pkg/metrics" pnet "github.com/pingcap/tiproxy/pkg/proxy/net" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -77,6 +78,7 @@ func (mer *mockEventReceiver) checkEvent(t *testing.T, eventName int) { type mockBackendInst struct { addr string healthy atomic.Bool + local atomic.Bool } func newMockBackendInst(ts *backendMgrTester) *mockBackendInst { @@ -84,6 +86,7 @@ func newMockBackendInst(ts *backendMgrTester) *mockBackendInst { addr: ts.tc.backendListener.Addr().String(), } mbi.setHealthy(true) + mbi.setLocal(true) return mbi } @@ -99,6 +102,14 @@ func (mbi *mockBackendInst) setHealthy(healthy bool) { mbi.healthy.Store(healthy) } +func (mbi *mockBackendInst) Local() bool { + return mbi.local.Load() +} + +func (mbi *mockBackendInst) setLocal(local bool) { + mbi.local.Store(local) +} + type runner struct { client func(packetIO *pnet.PacketIO) error proxy func(clientIO, backendIO *pnet.PacketIO) error @@ -1248,12 +1259,18 @@ func TestTrafficMetrics(t *testing.T) { inBytes, inPackets, outBytes, outPackets, err = readTraffic(addr) require.NoError(t, err) require.True(t, inBytes > 0 && inPackets > 0 && outBytes > 0 && outPackets > 0) + crossLocationBytes, err := metrics.ReadCounter(metrics.CrossLocationBytesCounter) + require.NoError(t, err) require.NoError(t, ts.forwardCmd4Proxy(clientIO, backendIO)) inBytes2, inPackets2, outBytes2, outPackets2, err := readTraffic(addr) require.NoError(t, err) require.True(t, inBytes2 > inBytes && inPackets2 > inPackets && outBytes2 > outBytes && outPackets2 > outPackets) require.True(t, inBytes2 > 4096 && inPackets2 > 1000) inBytes, inPackets, outBytes, outPackets = inBytes2, inPackets2, outBytes2, outPackets2 + // The first backend is local, so no cross-az traffic. + crossLocationBytes2, err := metrics.ReadCounter(metrics.CrossLocationBytesCounter) + require.NoError(t, err) + require.True(t, crossLocationBytes2 == crossLocationBytes) return nil }, backend: func(packetIO *pnet.PacketIO) error { @@ -1265,8 +1282,14 @@ func TestTrafficMetrics(t *testing.T) { }, // 2nd handshake: redirect { - client: nil, - proxy: ts.redirectSucceed4Proxy, + client: nil, + proxy: func(clientIO, backendIO *pnet.PacketIO) error { + backendInst := newMockBackendInst(ts) + backendInst.setLocal(false) + ts.mp.Redirect(backendInst) + ts.mp.getEventReceiver().(*mockEventReceiver).checkEvent(ts.t, eventSucceed) + return nil + }, backend: ts.redirectSucceed4Backend, }, // the traffic should still increase after redirection @@ -1280,10 +1303,16 @@ func TestTrafficMetrics(t *testing.T) { inBytes1, inPackets1, outBytes1, outPackets1, err := readTraffic(addr) require.NoError(t, err) require.True(t, inBytes1 > inBytes && inPackets1 > inPackets && outBytes1 > outBytes && outPackets1 > outPackets) + crossLocationBytes, err := metrics.ReadCounter(metrics.CrossLocationBytesCounter) + require.NoError(t, err) require.NoError(t, ts.forwardCmd4Proxy(clientIO, backendIO)) inBytes2, inPackets2, outBytes2, outPackets2, err := readTraffic(addr) require.NoError(t, err) require.True(t, inBytes2 > inBytes1 && inPackets2 > inPackets1 && outBytes2 > outBytes1 && outPackets2 > outPackets1) + // The second backend is remote, so exists cross-az traffic. + crossLocationBytes2, err := metrics.ReadCounter(metrics.CrossLocationBytesCounter) + require.NoError(t, err) + require.True(t, crossLocationBytes2 > crossLocationBytes) return nil }, backend: func(packetIO *pnet.PacketIO) error { diff --git a/pkg/proxy/backend/metrics.go b/pkg/proxy/backend/metrics.go index 37a9afbd..80adfffb 100644 --- a/pkg/proxy/backend/metrics.go +++ b/pkg/proxy/backend/metrics.go @@ -57,7 +57,10 @@ func addCmdMetrics(cmd pnet.Command, addr string, startTime monotime.Time) { mc.observer.Observe(cost.Seconds()) } -func addTraffic(addr string, inBytes, inPackets, outBytes, outPackets uint64) { +func addTraffic(addr string, inBytes, inPackets, outBytes, outPackets uint64, local bool) { + if !local { + metrics.CrossLocationBytesCounter.Add(float64(inBytes + outBytes)) + } cache.Lock() defer cache.Unlock() // Updating traffic per IO costs too much CPU, so update it per command.