From 89a060c034b618a5a438cddd9f11e389d94b4686 Mon Sep 17 00:00:00 2001 From: Lionel Jouin Date: Fri, 27 Oct 2023 16:58:18 +0200 Subject: [PATCH] Target hits metrics using nftables verdict map The nftables implementation has been changed to a chain with no hook per fwmark. Another chain will catch the fwmark and jump to the counting chains. --- docs/observability/metrics.md | 19 +-- pkg/loadbalancer/stream/loadbalancer.go | 10 +- pkg/loadbalancer/stream/target.go | 12 +- pkg/loadbalancer/target/metrics.go | 209 +++++++++++++++--------- pkg/loadbalancer/types/target.go | 4 +- 5 files changed, 149 insertions(+), 105 deletions(-) diff --git a/docs/observability/metrics.md b/docs/observability/metrics.md index df030f34..6b7db958 100644 --- a/docs/observability/metrics.md +++ b/docs/observability/metrics.md @@ -28,21 +28,11 @@ Counts number of packets that have matched a flow. * Stream * Flow -### meridio.conduit.stream.target.hits.packets +### meridio.conduit.stream.target.hits.`METRIC_TYPE` -Counts number of packets that have hit a target. +`METRIC_TYPE`: packets, bytes -* Type: Counter -* Attributes: - * Pod Name - * Trench - * Conduit - * Stream - * Target (identifier + IPs) - -### meridio.conduit.stream.target.hits.bytes - -Counts number of bytes that have hit a target. +Counts number of `METRIC_TYPE` that have hit a target. * Type: Counter * Attributes: @@ -50,7 +40,8 @@ Counts number of bytes that have hit a target. * Trench * Conduit * Stream - * Target (identifier + IPs) + * Identifier + * IPs ### meridio.conduit.stream.target.latency (Planned) diff --git a/pkg/loadbalancer/stream/loadbalancer.go b/pkg/loadbalancer/stream/loadbalancer.go index bc575024..60b7e340 100644 --- a/pkg/loadbalancer/stream/loadbalancer.go +++ b/pkg/loadbalancer/stream/loadbalancer.go @@ -170,11 +170,11 @@ func (lb *LoadBalancer) AddTarget(target types.Target) error { if exists { return errors.New("the target is already registered") } - err := target.Configure(lb.IdentifierOffset) // TODO: avoid multiple identical ip rule entries (e.g. after container crash) + err := target.Configure() // TODO: avoid multiple identical ip rule entries (e.g. after container crash) if err != nil { lb.addPendingTarget(target) returnErr := err - err = target.Delete(lb.IdentifierOffset) + err = target.Delete() if err != nil { return fmt.Errorf("%w; %v", err, returnErr) } @@ -184,7 +184,7 @@ func (lb *LoadBalancer) AddTarget(target types.Target) error { if err != nil { lb.addPendingTarget(target) returnErr := err - err = target.Delete(lb.IdentifierOffset) + err = target.Delete() if err != nil { return fmt.Errorf("%w; %v", err, returnErr) } @@ -208,7 +208,7 @@ func (lb *LoadBalancer) RemoveTarget(identifier int) error { if err != nil { errFinal = fmt.Errorf("%w; %v", errFinal, err) // todo } - err = target.Delete(lb.IdentifierOffset) + err = target.Delete() if err != nil { errFinal = fmt.Errorf("%w; %v", errFinal, err) // todo } @@ -347,7 +347,7 @@ func (lb *LoadBalancer) setTargets(targets []*nspAPI.Target) error { var errFinal error newTargetsMap := make(map[int]types.Target) for _, target := range targets { - t, err := NewTarget(target, lb.netUtils, lb.targetHitsMetrics) + t, err := NewTarget(target, lb.netUtils, lb.targetHitsMetrics, lb.IdentifierOffset) if err != nil { continue } diff --git a/pkg/loadbalancer/stream/target.go b/pkg/loadbalancer/stream/target.go index 772f2a42..bbecd5d4 100644 --- a/pkg/loadbalancer/stream/target.go +++ b/pkg/loadbalancer/stream/target.go @@ -33,15 +33,17 @@ type target struct { nspTarget *nspAPI.Target netUtils networking.Utils identifier int + identifierOffset int targetHitsMetrics *targetMetrics.HitsMetrics } -func NewTarget(nspTarget *nspAPI.Target, netUtils networking.Utils, targetHitsMetrics *targetMetrics.HitsMetrics) (types.Target, error) { +func NewTarget(nspTarget *nspAPI.Target, netUtils networking.Utils, targetHitsMetrics *targetMetrics.HitsMetrics, identifierOffset int) (types.Target, error) { target := &target{ fwMarks: []networking.FWMarkRoute{}, nspTarget: nspTarget, netUtils: netUtils, targetHitsMetrics: targetHitsMetrics, + identifierOffset: identifierOffset, } if nspTarget.GetStatus() != nspAPI.Target_ENABLED { return nil, errors.New("the target is not enabled") @@ -75,11 +77,11 @@ func (t *target) Verify() bool { return true } -func (t *target) Configure(identifierOffset int) error { +func (t *target) Configure() error { if t.fwMarks == nil { t.fwMarks = []networking.FWMarkRoute{} } - offsetId := t.identifier + identifierOffset + offsetId := t.identifier + t.identifierOffset for _, ip := range t.GetIps() { var fwMark networking.FWMarkRoute fwMark, err := t.netUtils.NewFWMarkRoute(ip, offsetId, offsetId) @@ -92,7 +94,7 @@ func (t *target) Configure(identifierOffset int) error { return nil } -func (t *target) Delete(identifierOffset int) error { +func (t *target) Delete() error { if t.fwMarks == nil { t.fwMarks = []networking.FWMarkRoute{} return nil @@ -105,7 +107,7 @@ func (t *target) Delete(identifierOffset int) error { } } t.fwMarks = []networking.FWMarkRoute{} - offsetId := t.identifier + identifierOffset + offsetId := t.identifier + t.identifierOffset _ = t.targetHitsMetrics.Unregister(offsetId) return errFinal } diff --git a/pkg/loadbalancer/target/metrics.go b/pkg/loadbalancer/target/metrics.go index 6bf4c6ce..e3af574b 100644 --- a/pkg/loadbalancer/target/metrics.go +++ b/pkg/loadbalancer/target/metrics.go @@ -19,6 +19,7 @@ package target import ( "context" "encoding/binary" + "fmt" "sync" "github.com/google/nftables" @@ -33,23 +34,27 @@ import ( const ( tableName = "meridio-metrics" chainName = "target-hits" + setName = "fwmark-verdict" ) type HitsMetrics struct { - hostname string - meter metric.Meter - targets map[int]*nspAPI.Target - table *nftables.Table - chain *nftables.Chain - mu sync.Mutex + hostname string + meter metric.Meter + targets map[int]*nspAPI.Target + fwmarkChains map[int]*nftables.Chain + table *nftables.Table + chain *nftables.Chain + fwmarkVerdict *nftables.Set + mu sync.Mutex } func NewTargetHitsMetrics(hostname string) (*HitsMetrics, error) { meter := otel.GetMeterProvider().Meter(meridioMetrics.METER_NAME) hm := &HitsMetrics{ - hostname: hostname, - meter: meter, - targets: map[int]*nspAPI.Target{}, + hostname: hostname, + meter: meter, + targets: map[int]*nspAPI.Target{}, + fwmarkChains: map[int]*nftables.Chain{}, } err := hm.init() @@ -60,21 +65,82 @@ func NewTargetHitsMetrics(hostname string) (*HitsMetrics, error) { return hm, nil } +func (hm *HitsMetrics) Delete() error { + conn := &nftables.Conn{} + + conn.DelTable(&nftables.Table{ + Family: nftables.TableFamilyINet, + Name: tableName, + }) + + return conn.Flush() +} + // init creates the nftables table and chain. func (hm *HitsMetrics) init() error { + _ = hm.Delete() + conn := &nftables.Conn{} + // nft add table inet meridio-metrics hm.table = conn.AddTable(&nftables.Table{ Family: nftables.TableFamilyINet, Name: tableName, }) + // nft 'add chain inet meridio-metrics target-hits { type filter hook postrouting priority filter ; }' hm.chain = conn.AddChain(&nftables.Chain{ Name: chainName, Table: hm.table, Type: nftables.ChainTypeFilter, Hooknum: nftables.ChainHookPostrouting, - Priority: nftables.ChainPriorityRef(-500), + Priority: nftables.ChainPriorityRef(*nftables.ChainPriorityFilter), + }) + + err := conn.Flush() + if err != nil { + return fmt.Errorf("target metrics, failed to flush (add table and chain): %w", err) + } + + // nft add map inet meridio-metrics fwmark-verdict { type mark : verdict\; } + hm.fwmarkVerdict = &nftables.Set{ + Table: hm.table, + Name: setName, + IsMap: true, + KeyType: nftables.TypeMark, + DataType: nftables.TypeVerdict, + } + err = conn.AddSet(hm.fwmarkVerdict, nil) + if err != nil { + return fmt.Errorf("target metrics, failed to AddSet: %w", err) // shouldn't happen since elements is nil + } + + // nft --debug all add rule inet meridio-metrics target-hits mark != 0x0 mark vmap @fwmark-verdict + // [ meta load mark => reg 1 ] + // [ cmp neq reg 1 0x00000000 ] + // [ meta load mark => reg 1 ] + // [ lookup reg 1 set fwmark-verdict dreg 0 ] + _ = conn.AddRule(&nftables.Rule{ + Table: hm.table, + Chain: hm.chain, + Exprs: []expr.Any{ + &expr.Meta{ + Key: expr.MetaKeyMARK, + Register: 1, + }, + &expr.Cmp{ + Op: expr.CmpOpNeq, + Register: 1, + Data: []byte{0x0}, + }, + &expr.Lookup{ + SourceRegister: 1, + SetID: hm.fwmarkVerdict.ID, + SetName: hm.fwmarkVerdict.Name, + IsDestRegSet: true, + DestRegister: 0, + }, + }, }) return conn.Flush() @@ -85,38 +151,27 @@ func (hm *HitsMetrics) Register(id int, target *nspAPI.Target) error { hm.mu.Lock() defer hm.mu.Unlock() - targetMetrics, err := hm.getMetrics() - if err != nil { - return err - } - - hm.targets[id] = target - - _, exists := targetMetrics[id] + _, exists := hm.fwmarkChains[id] if exists { return nil } + hm.targets[id] = target + conn := &nftables.Conn{} - // nft --debug all add rule inet meridio-metrics target-hits meta mark 0x13dc counter - // [ meta load mark => reg 1 ] - // [ cmp eq reg 1 0x000013dc ] + // nft add chain inet meridio-metrics fwmark-100 + fwmarkChain := conn.AddChain(&nftables.Chain{ + Name: fmt.Sprintf("fwmark-%d", id), + Table: hm.table, + }) + + // nft --debug all add rule inet meridio-metrics fwmark-100 counter // [ counter pkts 0 bytes 0 ] _ = conn.AddRule(&nftables.Rule{ Table: hm.table, - Chain: hm.chain, - // Handle: , + Chain: fwmarkChain, Exprs: []expr.Any{ - &expr.Meta{ - Key: expr.MetaKeyMARK, - Register: 1, - }, - &expr.Cmp{ - Op: expr.CmpOpEq, - Register: 1, - Data: encodeID(id), - }, &expr.Counter{ Bytes: 0, Packets: 0, @@ -124,6 +179,21 @@ func (hm *HitsMetrics) Register(id int, target *nspAPI.Target) error { }, }) + hm.fwmarkChains[id] = fwmarkChain + + err := conn.SetAddElements(hm.fwmarkVerdict, []nftables.SetElement{ + { + Key: encodeID(id), + VerdictData: &expr.Verdict{ + Kind: expr.VerdictJump, + Chain: fwmarkChain.Name, + }, + }, + }) + if err != nil { + return fmt.Errorf("target metrics, failed to SetAddElements: %w", err) + } + return conn.Flush() } @@ -132,25 +202,31 @@ func (hm *HitsMetrics) Unregister(id int) error { hm.mu.Lock() defer hm.mu.Unlock() - delete(hm.targets, id) - - targetMetrics, err := hm.getRules() - if err != nil { - return err - } - - rule, exists := targetMetrics[id] + fwmarkChain, exists := hm.fwmarkChains[id] if !exists { return nil } + delete(hm.targets, id) + delete(hm.fwmarkChains, id) + conn := &nftables.Conn{} - err = conn.DelRule(rule) + err := conn.SetDeleteElements(hm.fwmarkVerdict, []nftables.SetElement{ + { + Key: encodeID(id), + VerdictData: &expr.Verdict{ + Kind: expr.VerdictJump, + Chain: fwmarkChain.Name, + }, + }, + }) if err != nil { - return err + return fmt.Errorf("target metrics, failed to SetDeleteElements: %w", err) } + conn.DelChain(fwmarkChain) + return conn.Flush() } @@ -163,7 +239,7 @@ func (hm *HitsMetrics) Collect() error { metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { targetMetrics, err := hm.getMetrics() if err != nil { - return err + return fmt.Errorf("target metrics, failed to getMetrics: %w", err) } for targetID, metrics := range targetMetrics { // Find the registred target for the collected counter @@ -198,7 +274,7 @@ func (hm *HitsMetrics) Collect() error { }), ) if err != nil { - return err + return fmt.Errorf("target metrics, failed to Int64ObservableCounter: %w", err) } _, err = hm.meter.Int64ObservableCounter( @@ -208,7 +284,7 @@ func (hm *HitsMetrics) Collect() error { metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { targetMetrics, err := hm.getMetrics() if err != nil { - return err + return fmt.Errorf("target metrics, failed to getMetrics: %w", err) } for targetID, metrics := range targetMetrics { // Find the registred target for the collected counter @@ -243,7 +319,7 @@ func (hm *HitsMetrics) Collect() error { }), ) if err != nil { - return err + return fmt.Errorf("target metrics, failed to Int64ObservableCounter: %w", err) } return nil @@ -253,44 +329,23 @@ func (hm *HitsMetrics) Collect() error { // the metrics/counter as value func (hm *HitsMetrics) getMetrics() (map[int]*expr.Counter, error) { counters := map[int]*expr.Counter{} + conn := &nftables.Conn{} - rules, err := hm.getRules() - if err != nil { - return nil, err - } - - for id, rule := range rules { - counterExpr := rule.Exprs[2].(*expr.Counter) - if counterExpr == nil { + for id, chain := range hm.fwmarkChains { + rules, err := conn.GetRules(hm.table, chain) + if err != nil || len(rules) < 1 || len(rules[0].Exprs) < 1 { continue } - counters[id] = counterExpr - } - - return counters, nil -} - -func (hm *HitsMetrics) getRules() (map[int]*nftables.Rule, error) { - conn := &nftables.Conn{} - - rules, err := conn.GetRules(hm.table, hm.chain) - if err != nil { - return nil, err - } - - rulesMap := map[int]*nftables.Rule{} - - for _, rule := range rules { - cmpExpr := rule.Exprs[1].(*expr.Cmp) - if cmpExpr == nil { + counterExpr, ok := rules[0].Exprs[0].(*expr.Counter) + if !ok { continue } - rulesMap[decodeID(cmpExpr.Data)] = rule + counters[id] = counterExpr } - return rulesMap, nil + return counters, nil } func encodeID(value int) []byte { @@ -298,7 +353,3 @@ func encodeID(value int) []byte { binary.NativeEndian.PutUint32(bs, uint32(value)) return bs } - -func decodeID(value []byte) int { - return int(binary.NativeEndian.Uint32(value)) -} diff --git a/pkg/loadbalancer/types/target.go b/pkg/loadbalancer/types/target.go index 37535676..88657075 100644 --- a/pkg/loadbalancer/types/target.go +++ b/pkg/loadbalancer/types/target.go @@ -19,7 +19,7 @@ package types type Target interface { GetIps() []string GetIdentifier() int - Configure(identifierOffset int) error + Configure() error Verify() bool - Delete(identifierOffset int) error + Delete() error }