diff --git a/cmd/stateless-lb/main.go b/cmd/stateless-lb/main.go index 5d8aa990..d5930e2b 100644 --- a/cmd/stateless-lb/main.go +++ b/cmd/stateless-lb/main.go @@ -49,6 +49,7 @@ import ( "github.com/nordix/meridio/pkg/loadbalancer/flow" "github.com/nordix/meridio/pkg/loadbalancer/nfqlb" "github.com/nordix/meridio/pkg/loadbalancer/stream" + "github.com/nordix/meridio/pkg/loadbalancer/target" "github.com/nordix/meridio/pkg/loadbalancer/types" "github.com/nordix/meridio/pkg/log" "github.com/nordix/meridio/pkg/metrics" @@ -167,6 +168,16 @@ func main() { }, } + hostname, err := os.Hostname() + if err != nil { + log.Fatal(logger, "Unable to get hostname", "error", err) + } + + targetHitsMetrics, err := target.NewTargetHitsMetrics(hostname) + if err != nil { + log.Fatal(logger, "Unable to init lb target metrics", "error", err) + } + lbFactory := nfqlb.NewLbFactory(nfqlb.WithNFQueue(config.Nfqueue)) nfa, err := nfqlb.NewNetfilterAdaptor(nfqlb.WithNFQueue(config.Nfqueue), nfqlb.WithNFQueueFanout(config.NfqueueFanout)) if err != nil { @@ -185,6 +196,7 @@ func main() { lbFactory, // to spawn nfqlb instance for each Stream created nfa, // netfilter kernel configuration to steer VIP traffic to nfqlb process config.IdentifierOffsetStart, + targetHitsMetrics, ) interfaceMonitorEndpoint := interfacemonitor.NewServer(interfaceMonitor, sns, netUtils) @@ -256,13 +268,6 @@ func main() { return } - hostname, err := os.Hostname() - if err != nil { - logger.Error(err, "Unable to get hostname") - cancel() - return - } - err = flow.CollectMetrics( flow.WithHostname(hostname), flow.WithTrenchName(config.TrenchName), @@ -274,6 +279,13 @@ func main() { return } + err = targetHitsMetrics.Collect() + if err != nil { + logger.Error(err, "Unable to start target hits metrics collector") + cancel() + return + } + metricsServer := metrics.Server{ IP: "", Port: config.MetricsPort, @@ -311,6 +323,7 @@ type SimpleNetworkService struct { lbFactory types.NFQueueLoadBalancerFactory nfa types.NFAdaptor natHandler *nat.NatHandler + targetHitsMetrics *target.HitsMetrics } /* // Request checks if allowed to serve the request @@ -347,6 +360,7 @@ func newSimpleNetworkService( lbFactory types.NFQueueLoadBalancerFactory, nfa types.NFAdaptor, identifierOffsetStart int, + targetHitsMetrics *target.HitsMetrics, ) *SimpleNetworkService { identifierOffsetGenerator := NewIdentifierOffsetGenerator(identifierOffsetStart) logger := log.FromContextOrGlobal(ctx).WithValues("class", "SimpleNetworkService") @@ -370,6 +384,7 @@ func newSimpleNetworkService( lbFactory: lbFactory, nfa: nfa, natHandler: nh, + targetHitsMetrics: targetHitsMetrics, } logger.Info("Created", "object", simpleNetworkService) return simpleNetworkService @@ -579,6 +594,7 @@ func (sns *SimpleNetworkService) addStream(strm *nspAPI.Stream) error { sns.netUtils, sns.lbFactory, identifierOffset, + sns.targetHitsMetrics, ) if err != nil { return err diff --git a/docs/observability/dashboard.json b/docs/observability/dashboard.json index 4155b070..7a90f483 100644 --- a/docs/observability/dashboard.json +++ b/docs/observability/dashboard.json @@ -18,7 +18,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 28, + "id": 29, "links": [], "liveNow": false, "panels": [ @@ -30,38 +30,16 @@ "fieldConfig": { "defaults": { "color": { - "mode": "palette-classic" + "mode": "thresholds" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" + "align": "auto", + "cellOptions": { + "type": "auto" }, - "thresholdsStyle": { - "mode": "off" - } + "filterable": false, + "inspect": false, + "minWidth": 150 }, "mappings": [], "thresholds": { @@ -81,24 +59,25 @@ "overrides": [] }, "gridPos": { - "h": 8, + "h": 20, "w": 12, "x": 0, "y": 0 }, - "id": 1, + "id": 3, "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false }, - "tooltip": { - "mode": "single", - "sort": "none" - } + "showHeader": true }, + "pluginVersion": "10.1.5", "targets": [ { "datasource": { @@ -107,17 +86,138 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "sum by(Flow, Stream, Conduit, Trench) (rate(meridio_conduit_stream_flow_matches_total[$__rate_interval]))", + "exemplar": false, + "expr": "meridio_conduit_stream_target_hits_packets_total", + "format": "table", "fullMetaSearch": false, "includeNullMetadata": true, - "legendFormat": "{{Flow}}.{{Stream}}.{{Conduit}}.{{Trench}}", - "range": true, - "refId": "A", + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "packets", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "exemplar": false, + "expr": "meridio_conduit_stream_target_hits_bytes_total", + "format": "table", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "bytes", "useBackend": false } ], - "title": "Flow Match (packet per second)", - "type": "timeseries" + "title": "Target Hits", + "transformations": [ + { + "id": "merge", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "Value": false, + "__name__": true, + "container": true, + "endpoint": true, + "instance": true, + "job": true, + "namespace": true, + "otel_scope_name": true, + "pod": true + }, + "indexByName": { + "Conduit": 1, + "Hostname": 3, + "IPs": 5, + "Identifier": 4, + "Stream": 2, + "Time": 6, + "Trench": 0, + "Value": 15, + "__name__": 7, + "container": 8, + "endpoint": 9, + "instance": 10, + "job": 11, + "namespace": 12, + "otel_scope_name": 13, + "pod": 14 + }, + "renameByName": { + "Value #packets": "" + } + } + }, + { + "id": "groupBy", + "options": { + "fields": { + "Conduit": { + "aggregations": [], + "operation": "groupby" + }, + "Hostname": { + "aggregations": [], + "operation": "groupby" + }, + "IPs": { + "aggregations": [], + "operation": "groupby" + }, + "Identifier": { + "aggregations": [], + "operation": "groupby" + }, + "Stream": { + "aggregations": [], + "operation": "groupby" + }, + "Trench": { + "aggregations": [], + "operation": "groupby" + }, + "Value #bytes": { + "aggregations": [ + "lastNotNull" + ], + "operation": "aggregate" + }, + "Value #packets": { + "aggregations": [ + "lastNotNull" + ], + "operation": "aggregate" + } + } + } + }, + { + "id": "organize", + "options": { + "excludeByName": {}, + "indexByName": {}, + "renameByName": { + "Identifier": "", + "Value #bytes (lastNotNull)": "Bytes", + "Value #packets (lastNotNull)": "Packets" + } + } + } + ], + "type": "table" }, { "datasource": { @@ -172,7 +272,7 @@ }, "showHeader": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.1.5", "targets": [ { "datasource": { @@ -231,11 +331,316 @@ "pod": 13, "service": 14 }, - "renameByName": {} + "renameByName": { + "Value": "Packets" + } } } ], "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 12 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum by(Flow, Stream, Conduit, Trench) (rate(meridio_conduit_stream_flow_matches_total[$__rate_interval]))", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "{{Flow}}.{{Stream}}.{{Conduit}}.{{Trench}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Flow Match (packet per second)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 20 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.5", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "exemplar": false, + "expr": "sum by(Trench, Conduit, Stream) (rate(meridio_conduit_stream_target_hits_packets_total[$__rate_interval]))", + "format": "time_series", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "{{Stream}}.{{Conduit}}.{{Trench}}", + "range": true, + "refId": "packets", + "useBackend": false + } + ], + "title": "Stream (packets per second)", + "transformations": [], + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 20 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.5", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "exemplar": false, + "expr": "sum by(Trench, Conduit, Stream) (rate(meridio_conduit_stream_target_hits_bytes_total[$__rate_interval]))", + "format": "time_series", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "{{Stream}}.{{Conduit}}.{{Trench}}", + "range": true, + "refId": "packets", + "useBackend": false + } + ], + "title": "Stream (bytes per second)", + "transformations": [], + "type": "timeseries" } ], "refresh": "5s", @@ -243,7 +648,19 @@ "style": "dark", "tags": [], "templating": { - "list": [] + "list": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "filters": [], + "hide": 0, + "name": "Filters", + "skipUrlSync": false, + "type": "adhoc" + } + ] }, "time": { "from": "now-15m", diff --git a/docs/observability/metrics.md b/docs/observability/metrics.md index 6fbabc1a..6b7db958 100644 --- a/docs/observability/metrics.md +++ b/docs/observability/metrics.md @@ -16,17 +16,6 @@ Counts number of `METRIC_TYPE` for a network interface. * Attactor (optional) * Interface Name -### meridio.conduit.stream.status (Planned) - -Stream status in the conduit instance. - -* Type: Gauge (Health Metric) -* Attributes: - * Pod Name - * Trench - * Conduit - * Stream - ### meridio.conduit.stream.flow.matches Counts number of packets that have matched a flow. @@ -39,9 +28,11 @@ Counts number of packets that have matched a flow. * Stream * Flow -### meridio.conduit.stream.target.packet.hits (Planned) +### meridio.conduit.stream.target.hits.`METRIC_TYPE` + +`METRIC_TYPE`: packets, bytes -Counts number of packets that have hit a target. +Counts number of `METRIC_TYPE` that have hit a target. * Type: Counter * Attributes: @@ -49,7 +40,8 @@ Counts number of packets that have hit a target. * Trench * Conduit * Stream - * Target (identifier + IPs) + * Identifier + * IPs ### meridio.conduit.stream.target.latency (Planned) diff --git a/pkg/loadbalancer/stream/loadbalancer.go b/pkg/loadbalancer/stream/loadbalancer.go index 45494129..60b7e340 100644 --- a/pkg/loadbalancer/stream/loadbalancer.go +++ b/pkg/loadbalancer/stream/loadbalancer.go @@ -29,6 +29,7 @@ import ( nspAPI "github.com/nordix/meridio/api/nsp/v1" "github.com/nordix/meridio/pkg/health" "github.com/nordix/meridio/pkg/loadbalancer/flow" + targetMetrics "github.com/nordix/meridio/pkg/loadbalancer/target" "github.com/nordix/meridio/pkg/loadbalancer/types" "github.com/nordix/meridio/pkg/log" "github.com/nordix/meridio/pkg/networking" @@ -54,6 +55,7 @@ type LoadBalancer struct { defrag *Defrag pendingCh chan struct{} // trigger pending Targets processing logger logr.Logger + targetHitsMetrics *targetMetrics.HitsMetrics } func New( @@ -64,6 +66,7 @@ func New( netUtils networking.Utils, lbFactory types.NFQueueLoadBalancerFactory, identifierOffset int, + targetHitsMetrics *targetMetrics.HitsMetrics, ) (types.Stream, error) { n := int(stream.GetMaxTargets()) m := int(stream.GetMaxTargets()) * 100 @@ -86,6 +89,7 @@ func New( pendingTargets: make(map[int]types.Target), pendingCh: make(chan struct{}, 10), logger: logger, + targetHitsMetrics: targetHitsMetrics, } // first enable kernel's IP defrag except for the interfaces facing targets // (defrag is needed by Flows to match rules with L4 information) @@ -166,7 +170,7 @@ func (lb *LoadBalancer) AddTarget(target types.Target) error { if exists { return errors.New("the target is already registered") } - err := target.Configure(lb.IdentifierOffset) // TODO: avoid multiple identical ip rule entries (e.g. after container crash) + err := target.Configure() // TODO: avoid multiple identical ip rule entries (e.g. after container crash) if err != nil { lb.addPendingTarget(target) returnErr := err @@ -343,7 +347,7 @@ func (lb *LoadBalancer) setTargets(targets []*nspAPI.Target) error { var errFinal error newTargetsMap := make(map[int]types.Target) for _, target := range targets { - t, err := NewTarget(target, lb.netUtils) + t, err := NewTarget(target, lb.netUtils, lb.targetHitsMetrics, lb.IdentifierOffset) if err != nil { continue } diff --git a/pkg/loadbalancer/stream/target.go b/pkg/loadbalancer/stream/target.go index 21ea3d1b..bbecd5d4 100644 --- a/pkg/loadbalancer/stream/target.go +++ b/pkg/loadbalancer/stream/target.go @@ -23,22 +23,27 @@ import ( "strconv" nspAPI "github.com/nordix/meridio/api/nsp/v1" + targetMetrics "github.com/nordix/meridio/pkg/loadbalancer/target" "github.com/nordix/meridio/pkg/loadbalancer/types" "github.com/nordix/meridio/pkg/networking" ) type target struct { - fwMarks []networking.FWMarkRoute - nspTarget *nspAPI.Target - netUtils networking.Utils - identifier int + fwMarks []networking.FWMarkRoute + nspTarget *nspAPI.Target + netUtils networking.Utils + identifier int + identifierOffset int + targetHitsMetrics *targetMetrics.HitsMetrics } -func NewTarget(nspTarget *nspAPI.Target, netUtils networking.Utils) (types.Target, error) { +func NewTarget(nspTarget *nspAPI.Target, netUtils networking.Utils, targetHitsMetrics *targetMetrics.HitsMetrics, identifierOffset int) (types.Target, error) { target := &target{ - fwMarks: []networking.FWMarkRoute{}, - nspTarget: nspTarget, - netUtils: netUtils, + fwMarks: []networking.FWMarkRoute{}, + nspTarget: nspTarget, + netUtils: netUtils, + targetHitsMetrics: targetHitsMetrics, + identifierOffset: identifierOffset, } if nspTarget.GetStatus() != nspAPI.Target_ENABLED { return nil, errors.New("the target is not enabled") @@ -72,19 +77,20 @@ func (t *target) Verify() bool { return true } -func (t *target) Configure(identifierOffset int) error { +func (t *target) Configure() error { if t.fwMarks == nil { t.fwMarks = []networking.FWMarkRoute{} } + offsetId := t.identifier + t.identifierOffset for _, ip := range t.GetIps() { var fwMark networking.FWMarkRoute - offsetId := t.identifier + identifierOffset fwMark, err := t.netUtils.NewFWMarkRoute(ip, offsetId, offsetId) if err != nil { return err } t.fwMarks = append(t.fwMarks, fwMark) } + _ = t.targetHitsMetrics.Register(offsetId, t.nspTarget) return nil } @@ -101,8 +107,11 @@ func (t *target) Delete() error { } } t.fwMarks = []networking.FWMarkRoute{} + offsetId := t.identifier + t.identifierOffset + _ = t.targetHitsMetrics.Unregister(offsetId) return errFinal } + func (t *target) MarshalJSON() ([]byte, error) { ts := struct { Identifier int `json:"identifier"` @@ -113,4 +122,3 @@ func (t *target) MarshalJSON() ([]byte, error) { } return json.Marshal(&ts) } - diff --git a/pkg/loadbalancer/target/metrics.go b/pkg/loadbalancer/target/metrics.go new file mode 100644 index 00000000..e3af574b --- /dev/null +++ b/pkg/loadbalancer/target/metrics.go @@ -0,0 +1,355 @@ +/* +Copyright (c) 2023 Nordix Foundation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package target + +import ( + "context" + "encoding/binary" + "fmt" + "sync" + + "github.com/google/nftables" + "github.com/google/nftables/expr" + nspAPI "github.com/nordix/meridio/api/nsp/v1" + meridioMetrics "github.com/nordix/meridio/pkg/metrics" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + tableName = "meridio-metrics" + chainName = "target-hits" + setName = "fwmark-verdict" +) + +type HitsMetrics struct { + hostname string + meter metric.Meter + targets map[int]*nspAPI.Target + fwmarkChains map[int]*nftables.Chain + table *nftables.Table + chain *nftables.Chain + fwmarkVerdict *nftables.Set + mu sync.Mutex +} + +func NewTargetHitsMetrics(hostname string) (*HitsMetrics, error) { + meter := otel.GetMeterProvider().Meter(meridioMetrics.METER_NAME) + hm := &HitsMetrics{ + hostname: hostname, + meter: meter, + targets: map[int]*nspAPI.Target{}, + fwmarkChains: map[int]*nftables.Chain{}, + } + + err := hm.init() + if err != nil { + return nil, err + } + + return hm, nil +} + +func (hm *HitsMetrics) Delete() error { + conn := &nftables.Conn{} + + conn.DelTable(&nftables.Table{ + Family: nftables.TableFamilyINet, + Name: tableName, + }) + + return conn.Flush() +} + +// init creates the nftables table and chain. +func (hm *HitsMetrics) init() error { + _ = hm.Delete() + + conn := &nftables.Conn{} + + // nft add table inet meridio-metrics + hm.table = conn.AddTable(&nftables.Table{ + Family: nftables.TableFamilyINet, + Name: tableName, + }) + + // nft 'add chain inet meridio-metrics target-hits { type filter hook postrouting priority filter ; }' + hm.chain = conn.AddChain(&nftables.Chain{ + Name: chainName, + Table: hm.table, + Type: nftables.ChainTypeFilter, + Hooknum: nftables.ChainHookPostrouting, + Priority: nftables.ChainPriorityRef(*nftables.ChainPriorityFilter), + }) + + err := conn.Flush() + if err != nil { + return fmt.Errorf("target metrics, failed to flush (add table and chain): %w", err) + } + + // nft add map inet meridio-metrics fwmark-verdict { type mark : verdict\; } + hm.fwmarkVerdict = &nftables.Set{ + Table: hm.table, + Name: setName, + IsMap: true, + KeyType: nftables.TypeMark, + DataType: nftables.TypeVerdict, + } + err = conn.AddSet(hm.fwmarkVerdict, nil) + if err != nil { + return fmt.Errorf("target metrics, failed to AddSet: %w", err) // shouldn't happen since elements is nil + } + + // nft --debug all add rule inet meridio-metrics target-hits mark != 0x0 mark vmap @fwmark-verdict + // [ meta load mark => reg 1 ] + // [ cmp neq reg 1 0x00000000 ] + // [ meta load mark => reg 1 ] + // [ lookup reg 1 set fwmark-verdict dreg 0 ] + _ = conn.AddRule(&nftables.Rule{ + Table: hm.table, + Chain: hm.chain, + Exprs: []expr.Any{ + &expr.Meta{ + Key: expr.MetaKeyMARK, + Register: 1, + }, + &expr.Cmp{ + Op: expr.CmpOpNeq, + Register: 1, + Data: []byte{0x0}, + }, + &expr.Lookup{ + SourceRegister: 1, + SetID: hm.fwmarkVerdict.ID, + SetName: hm.fwmarkVerdict.Name, + IsDestRegSet: true, + DestRegister: 0, + }, + }, + }) + + return conn.Flush() +} + +// Register adds a target as nftables rule in the postrouting chain +func (hm *HitsMetrics) Register(id int, target *nspAPI.Target) error { + hm.mu.Lock() + defer hm.mu.Unlock() + + _, exists := hm.fwmarkChains[id] + if exists { + return nil + } + + hm.targets[id] = target + + conn := &nftables.Conn{} + + // nft add chain inet meridio-metrics fwmark-100 + fwmarkChain := conn.AddChain(&nftables.Chain{ + Name: fmt.Sprintf("fwmark-%d", id), + Table: hm.table, + }) + + // nft --debug all add rule inet meridio-metrics fwmark-100 counter + // [ counter pkts 0 bytes 0 ] + _ = conn.AddRule(&nftables.Rule{ + Table: hm.table, + Chain: fwmarkChain, + Exprs: []expr.Any{ + &expr.Counter{ + Bytes: 0, + Packets: 0, + }, + }, + }) + + hm.fwmarkChains[id] = fwmarkChain + + err := conn.SetAddElements(hm.fwmarkVerdict, []nftables.SetElement{ + { + Key: encodeID(id), + VerdictData: &expr.Verdict{ + Kind: expr.VerdictJump, + Chain: fwmarkChain.Name, + }, + }, + }) + if err != nil { + return fmt.Errorf("target metrics, failed to SetAddElements: %w", err) + } + + return conn.Flush() +} + +// Unregister removes the nftables rule of a target from the postrouting chain +func (hm *HitsMetrics) Unregister(id int) error { + hm.mu.Lock() + defer hm.mu.Unlock() + + fwmarkChain, exists := hm.fwmarkChains[id] + if !exists { + return nil + } + + delete(hm.targets, id) + delete(hm.fwmarkChains, id) + + conn := &nftables.Conn{} + + err := conn.SetDeleteElements(hm.fwmarkVerdict, []nftables.SetElement{ + { + Key: encodeID(id), + VerdictData: &expr.Verdict{ + Kind: expr.VerdictJump, + Chain: fwmarkChain.Name, + }, + }, + }) + if err != nil { + return fmt.Errorf("target metrics, failed to SetDeleteElements: %w", err) + } + + conn.DelChain(fwmarkChain) + + return conn.Flush() +} + +// Collect collects the metrics for the all the target rules. +func (hm *HitsMetrics) Collect() error { + _, err := hm.meter.Int64ObservableCounter( + meridioMetrics.MERIDIO_CONDUIT_STREAM_TARGET_HITS_PACKETS, + metric.WithUnit("packets"), // TODO: what unit must be set? + metric.WithDescription("Counts number of packets that have hit a target."), + metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { + targetMetrics, err := hm.getMetrics() + if err != nil { + return fmt.Errorf("target metrics, failed to getMetrics: %w", err) + } + for targetID, metrics := range targetMetrics { + // Find the registred target for the collected counter + hm.mu.Lock() + target, exists := hm.targets[targetID] + hm.mu.Unlock() + if !exists { + continue + } + + if target.GetStream() == nil || + target.GetStream().GetConduit() == nil || + target.GetStream().GetConduit().GetTrench() == nil { + continue + } + + streamName := target.GetStream().GetName() + conduitName := target.GetStream().GetConduit().GetName() + trenchName := target.GetStream().GetConduit().GetTrench().GetName() + + observer.Observe( + int64(metrics.Packets), + metric.WithAttributes(attribute.String("Hostname", hm.hostname)), + metric.WithAttributes(attribute.String("Trench", trenchName)), + metric.WithAttributes(attribute.String("Conduit", conduitName)), + metric.WithAttributes(attribute.String("Stream", streamName)), + metric.WithAttributes(attribute.Int("Identifier", targetID)), + metric.WithAttributes(attribute.StringSlice("IPs", target.GetIps())), + ) + } + return nil + }), + ) + if err != nil { + return fmt.Errorf("target metrics, failed to Int64ObservableCounter: %w", err) + } + + _, err = hm.meter.Int64ObservableCounter( + meridioMetrics.MERIDIO_CONDUIT_STREAM_TARGET_HITS_BYTES, + metric.WithUnit("bytes"), // TODO: what unit must be set? + metric.WithDescription("Counts number of bytes that have hit a target."), + metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { + targetMetrics, err := hm.getMetrics() + if err != nil { + return fmt.Errorf("target metrics, failed to getMetrics: %w", err) + } + for targetID, metrics := range targetMetrics { + // Find the registred target for the collected counter + hm.mu.Lock() + target, exists := hm.targets[targetID] + hm.mu.Unlock() + if !exists { + continue + } + + if target.GetStream() == nil || + target.GetStream().GetConduit() == nil || + target.GetStream().GetConduit().GetTrench() == nil { + continue + } + + streamName := target.GetStream().GetName() + conduitName := target.GetStream().GetConduit().GetName() + trenchName := target.GetStream().GetConduit().GetTrench().GetName() + + observer.Observe( + int64(metrics.Bytes), + metric.WithAttributes(attribute.String("Hostname", hm.hostname)), + metric.WithAttributes(attribute.String("Trench", trenchName)), + metric.WithAttributes(attribute.String("Conduit", conduitName)), + metric.WithAttributes(attribute.String("Stream", streamName)), + metric.WithAttributes(attribute.Int("Identifier", targetID)), + metric.WithAttributes(attribute.StringSlice("IPs", target.GetIps())), + ) + } + return nil + }), + ) + if err != nil { + return fmt.Errorf("target metrics, failed to Int64ObservableCounter: %w", err) + } + + return nil +} + +// getMetrics gets all the rules in the postrouting chain and export the fwmark as key and +// the metrics/counter as value +func (hm *HitsMetrics) getMetrics() (map[int]*expr.Counter, error) { + counters := map[int]*expr.Counter{} + conn := &nftables.Conn{} + + for id, chain := range hm.fwmarkChains { + rules, err := conn.GetRules(hm.table, chain) + if err != nil || len(rules) < 1 || len(rules[0].Exprs) < 1 { + continue + } + + counterExpr, ok := rules[0].Exprs[0].(*expr.Counter) + if !ok { + continue + } + + counters[id] = counterExpr + } + + return counters, nil +} + +func encodeID(value int) []byte { + bs := make([]byte, 4) + binary.NativeEndian.PutUint32(bs, uint32(value)) + return bs +} diff --git a/pkg/loadbalancer/types/target.go b/pkg/loadbalancer/types/target.go index 190a0d16..88657075 100644 --- a/pkg/loadbalancer/types/target.go +++ b/pkg/loadbalancer/types/target.go @@ -19,7 +19,7 @@ package types type Target interface { GetIps() []string GetIdentifier() int - Configure(identifierOffset int) error + Configure() error Verify() bool Delete() error } diff --git a/pkg/metrics/const.go b/pkg/metrics/const.go index c78404ee..5c89f30c 100644 --- a/pkg/metrics/const.go +++ b/pkg/metrics/const.go @@ -17,7 +17,9 @@ limitations under the License. package metrics const ( - MERIDIO_CONDUIT_STREAM_FLOW_MATCHES = "meridio.conduit.stream.flow.matches" + MERIDIO_CONDUIT_STREAM_FLOW_MATCHES = "meridio.conduit.stream.flow.matches" + MERIDIO_CONDUIT_STREAM_TARGET_HITS_PACKETS = "meridio.conduit.stream.target.hits.packets" + MERIDIO_CONDUIT_STREAM_TARGET_HITS_BYTES = "meridio.conduit.stream.target.hits.bytes" METER_NAME = "Meridio" )