Skip to content

Commit

Permalink
update connection testing logic and error handling
Browse files Browse the repository at this point in the history
- grpc tcp connection is now tested as first step
- removed retries of socket connection test, retry is run until success in retryInterval
- retryInterval lowered to 5 seconds
- update shutdown process:
-- when connection to dpdk fails, process ends and doesn't retry the connection
-- this way it can be handled by kubernetes and shown in restart counter
-- added graceful shutdown of http server
- udpate error handling:
-- queryTelemetry function now returns error, this error indicates problems with dpdk telemetry socket and shuts down the program
-- connection testing functions merged together to clean up code
- check for uid to use correct dpdk telemetry socket
  • Loading branch information
vlorinc committed Oct 1, 2024
1 parent 80fcbfa commit 70007cd
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 70 deletions.
136 changes: 85 additions & 51 deletions cli/dpservice-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
package main

import (
"context"
"flag"
"fmt"
"net"
"net/http"
"net/netip"
"os"
"os/user"
"strconv"
"time"

"github.com/ironcore-dev/dpservice/cli/dpservice-exporter/metrics"
Expand All @@ -19,21 +22,19 @@ import (
)

const (
maxRetries = 5
sleepTime = 10 * time.Second
retryInterval = 5 * time.Second
)

var (
version = "unknown"
grpcPort uint64
version = "unknown"
grpcPort uint64
host string
pollIntervalFlag int
)

func main() {
var conn net.Conn
var err error
var host string
var hostnameFlag string
var pollIntervalFlag int
var exporterPort uint64
var exporterAddr netip.AddrPort

Expand Down Expand Up @@ -63,6 +64,13 @@ func main() {
}
log.Infof("Hostname: %s", host)

uid, err := getUID()
if err != nil {
log.Warningf("Could not get UID, assuming root: %v", err)
} else if uid != 0 {
metrics.SocketPath = fmt.Sprintf("/run/user/%d/dpdk/rte/dpdk_telemetry.v2", uid)
}

r := prometheus.NewRegistry()
r.MustRegister(metrics.InterfaceStat)
r.MustRegister(metrics.CallCount)
Expand All @@ -71,73 +79,56 @@ func main() {

http.Handle("/metrics", promhttp.HandlerFor(r, promhttp.HandlerOpts{}))

conn = connectToDpdkTelemetry(log)
defer conn.Close()
exitChan := make(chan struct{})
go periodicMetricsUpdate(log, exitChan)

// Run server in goroutine
log.Infof("Server starting on :%v...", exporterPort)
server := &http.Server{Addr: exporterAddr.String()}
go func() {
for {
if !testDpdkConnection(conn, log) {
log.Infof("Reconnecting to %s", metrics.SocketPath)
conn = connectToDpdkTelemetry(log)
log.Infof("Reconnected to %s", metrics.SocketPath)
} else {
metrics.Update(conn, host, log)
}

time.Sleep(time.Duration(pollIntervalFlag) * time.Second)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("ListenAndServe failed: %v", err)
}
}()

log.Infof("Server starting on :%v...", exporterPort)
<-exitChan
// Create a context with a timeout to ensure the server shuts down gracefully
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = http.ListenAndServe(exporterAddr.String(), nil)
if err != nil {
log.Fatalf("ListenAndServe failed: %d", err)
// Shutdown the server
log.Info("Shutting down server...")
if err := server.Shutdown(ctx); err != nil {
log.Infof("HTTP server Shutdown error: %v\n", err)
}
}

// Tests if DPDK telemetry connection is working by writing to the connection
func testDpdkConnection(conn net.Conn, log *logrus.Logger) bool {
// Check if dpservice TCP port on localhost is open
// Checks if dpservice GRPC TCP port on localhost is open
func testGrpcConnection(log *logrus.Logger) bool {
dpserviceAddress := fmt.Sprintf("127.0.0.1:%d", grpcPort)
tcpConn, err := net.DialTimeout("tcp", dpserviceAddress, 2*time.Second)
if err != nil {
log.Warningf("TCP port %d on localhost is not open: %v", grpcPort, err)
log.Errorf("TCP port %d on localhost is not open: %v. Retry in %d seconds.", grpcPort, err, int(retryInterval.Seconds()))
return false
}
defer tcpConn.Close()

_, err = conn.Write([]byte("/"))
if err != nil {
return false
}
flushErr := flushSocket(conn)
if flushErr != nil {
log.Fatalf("Failed to read response from %s: %v", metrics.SocketPath, err)
}
return true
}

// Connects to the DPDK telemetry
func connectToDpdkTelemetry(log *logrus.Logger) net.Conn {
for i := 0; i < maxRetries; i++ {
conn, err := net.Dial("unixpacket", metrics.SocketPath)
if err == nil {
err = flushSocket(conn)
if err != nil {
log.Fatalf("Failed to read response from %s: %v", metrics.SocketPath, err)
}
return conn
}
log.Warningf("Failed to connect to %s: %v. Retry %d of %d", metrics.SocketPath, err, i+1, maxRetries)
if i < maxRetries-1 {
time.Sleep(sleepTime)
}
if i == maxRetries-1 {
log.Fatal("Exiting. Maximum connection retries reached")
conn, err := net.Dial("unixpacket", metrics.SocketPath)
if err != nil {
return nil
} else {
err = flushSocket(conn)
if err != nil {
log.Errorf("Failed to read response from %s: %v", metrics.SocketPath, err)
return nil
}
return conn
}
return nil
}

// Flushes the connection socket
Expand Down Expand Up @@ -168,3 +159,46 @@ func getHostname(hostnameFlag string) (string, error) {
return hostnameFlag, nil
}
}

// Gets UID from os
func getUID() (int, error) {
user, err := user.Current()
if err != nil {
return -1, fmt.Errorf("could not get user: %v", err)
}
uid, err := strconv.Atoi(user.Uid)
if err != nil {
return -1, fmt.Errorf("could not get uid: %v", err)
}
return uid, nil
}

// Initializes connection and updates metric in pollIntervalFlag period
func periodicMetricsUpdate(log *logrus.Logger, exitChan chan struct{}) {
log.Infof("Waiting for GRPC 127.0.0.1:%d", grpcPort)
for !testGrpcConnection(log) {
time.Sleep(retryInterval)
}
log.Infof("Connected to GRPC 127.0.0.1:%d", grpcPort)

log.Infof("Trying to connect to dpdk telemetry socket: %s", metrics.SocketPath)
conn := connectToDpdkTelemetry(log)
if conn == nil {
log.Error("Connection to dpdk telemetry failed; exiting...")
exitChan <- struct{}{}
return
}
defer conn.Close()
log.Infof("Connected to dpdk telemetry socket: %s", metrics.SocketPath)

log.Infof("Starting to update metrics in %d second intervals.", pollIntervalFlag)
for {
err := metrics.Update(conn, host, log)
if err != nil {
log.Errorf("Connection to dpdk telemetry failed: %v; exiting...", err)
exitChan <- struct{}{}
return
}
time.Sleep(time.Duration(pollIntervalFlag) * time.Second)
}
}
66 changes: 48 additions & 18 deletions cli/dpservice-exporter/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
"github.com/sirupsen/logrus"
)

const SocketPath = "/var/run/dpdk/rte/dpdk_telemetry.v2"
var SocketPath = "/var/run/dpdk/rte/dpdk_telemetry.v2"

func queryTelemetry(conn net.Conn, log *logrus.Logger, command string, response interface{}) {
func queryTelemetry(conn net.Conn, log *logrus.Logger, command string, response interface{}) error {
_, err := conn.Write([]byte(command))
if err != nil {
log.Errorf("Failed to send command to %s: %v", SocketPath, err)
return
return err
}

respBytes := make([]byte, 1024*6)
Expand All @@ -29,7 +29,7 @@ func queryTelemetry(conn net.Conn, log *logrus.Logger, command string, response
n, err := conn.Read(respBytes)
if err != nil {
log.Errorf("Failed to read response from %s: %v", SocketPath, err)
return
return err
}
responseBuffer.Write(respBytes[:n])
parts := strings.SplitN(command, ",", 2)
Expand All @@ -43,15 +43,22 @@ func queryTelemetry(conn net.Conn, log *logrus.Logger, command string, response
if err != nil {
log.Errorf("Failed to unmarshal JSON response: %v", err)
}
return nil
}

func Update(conn net.Conn, hostname string, log *logrus.Logger) {
func Update(conn net.Conn, hostname string, log *logrus.Logger) error {
var ealHeapList EalHeapList
queryTelemetry(conn, log, "/eal/heap_list", &ealHeapList)
err := queryTelemetry(conn, log, "/eal/heap_list", &ealHeapList)
if err != nil {
return err
}

for _, id := range ealHeapList.Value {
var ealHeapInfo EalHeapInfo
queryTelemetry(conn, log, fmt.Sprintf("/eal/heap_info,%d", id), &ealHeapInfo)
err = queryTelemetry(conn, log, fmt.Sprintf("/eal/heap_info,%d", id), &ealHeapInfo)
if err != nil {
return err
}
for key, value := range ealHeapInfo.Value {
// Only export metrics of type float64 (/eal/heap_info contains also some string values)
if v, ok := value.(float64); ok {
Expand All @@ -61,49 +68,72 @@ func Update(conn net.Conn, hostname string, log *logrus.Logger) {
}

var ethdevList EthdevList
queryTelemetry(conn, log, "/ethdev/list", &ethdevList)

err = queryTelemetry(conn, log, "/ethdev/list", &ethdevList)
if err != nil {
return err
}
for _, id := range ethdevList.Value {
var ethdevInfo EthdevInfo
queryTelemetry(conn, log, fmt.Sprintf("/ethdev/info,%d", id), &ethdevInfo)
err = queryTelemetry(conn, log, fmt.Sprintf("/ethdev/info,%d", id), &ethdevInfo)
if err != nil {
return err
}

var ethdevXstats EthdevXstats
queryTelemetry(conn, log, fmt.Sprintf("/ethdev/xstats,%d", id), &ethdevXstats)
err = queryTelemetry(conn, log, fmt.Sprintf("/ethdev/xstats,%d", id), &ethdevXstats)
if err != nil {
return err
}

for statName, statValueFloat := range ethdevXstats.Value {
InterfaceStat.With(prometheus.Labels{"interface": ethdevInfo.Value.Name, "stat_name": statName}).Set(statValueFloat)
}
}

var dpserviceNatPort DpServiceNatPort
queryTelemetry(conn, log, "/dp_service/nat/used_port_count", &dpserviceNatPort)
err = queryTelemetry(conn, log, "/dp_service/nat/used_port_count", &dpserviceNatPort)
if err != nil {
return err
}
for ifName, portCount := range dpserviceNatPort.Value {
InterfaceStat.With(prometheus.Labels{"interface": ifName, "stat_name": "nat_used_port_count"}).Set(float64(portCount))
}

var dpserviceVirtsvcPort DpServiceVirtsvcPort
queryTelemetry(conn, log, "/dp_service/virtsvc/used_port_count", &dpserviceVirtsvcPort)
err = queryTelemetry(conn, log, "/dp_service/virtsvc/used_port_count", &dpserviceVirtsvcPort)
if err != nil {
return err
}
for ifName, portCount := range dpserviceVirtsvcPort.Value {
InterfaceStat.With(prometheus.Labels{"interface": ifName, "stat_name": "virtsvc_used_port_count"}).Set(float64(portCount))
}

var dpserviceFirewallRuleCount DpServiceFirewallRuleCount
queryTelemetry(conn, log, "/dp_service/firewall/rule_count", &dpserviceFirewallRuleCount)
err = queryTelemetry(conn, log, "/dp_service/firewall/rule_count", &dpserviceFirewallRuleCount)
if err != nil {
return err
}
for ifName, fwRuleCount := range dpserviceFirewallRuleCount.Value {
InterfaceStat.With(prometheus.Labels{"interface": ifName, "stat_name": "firewall_rule_count"}).Set(float64(fwRuleCount))
}

var dpserviceCallCount DpServiceGraphCallCount
queryTelemetry(conn, log, "/dp_service/graph/call_count", &dpserviceCallCount)

err = queryTelemetry(conn, log, "/dp_service/graph/call_count", &dpserviceCallCount)
if err != nil {
return err
}
for graphNodeName, callCount := range dpserviceCallCount.GraphCallCnt.Node_0_to_255 {
CallCount.With(prometheus.Labels{"node_name": hostname, "graph_node": graphNodeName}).Set(callCount)
}

var dpServiceHashTableSaturation DpServiceHashTableSaturation
queryTelemetry(conn, log, "/dp_service/table/saturation", &dpServiceHashTableSaturation)

err = queryTelemetry(conn, log, "/dp_service/table/saturation", &dpServiceHashTableSaturation)
if err != nil {
return err
}
for table, saturation := range dpServiceHashTableSaturation.Value {
HashTableSaturation.With(prometheus.Labels{"table_name": table, "stat_name": "capacity"}).Set(saturation.Capacity)
HashTableSaturation.With(prometheus.Labels{"table_name": table, "stat_name": "entries"}).Set(saturation.Entries)
}
return nil
}
2 changes: 1 addition & 1 deletion test/local/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self, build_path):
self.cmd = build_path + "/cli/dpservice-exporter/dpservice-exporter"

def start(self):
self.process = subprocess.Popen([self.cmd, f"-port={exporter_port}", f"--grpc-port={grpc-port}"])
self.process = subprocess.Popen([self.cmd, f"--port={exporter_port}", f"--grpc-port={grpc_port}"])

def stop(self):
if self.process:
Expand Down

0 comments on commit 70007cd

Please sign in to comment.