Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added csv stats knob #37

Merged
merged 9 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,478 changes: 0 additions & 1,478 deletions coverage.out

This file was deleted.

4 changes: 4 additions & 0 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func getGPRCDialOptions(jctx *JCtx, vendor *vendor) ([]grpc.DialOption, error) {

if *stateHandler {
opts = append(opts, grpc.WithStatsHandler(&statshandler{jctx: jctx}))
if isCsvStatsEnabled(jctx) {
jctx.config.InternalJtimon.csvLogger.Printf(fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
"sensor-path", "sequence-number", "component-id", "sub-component-id", "packet-size", "p-ts", "e-ts", "re-stream-creation-ts", "re-payload-get-ts"))
}
}

switch *compression {
Expand Down
36 changes: 12 additions & 24 deletions internal_jtimon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,24 @@ package main
import (
"encoding/json"
"fmt"
gnmi "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
"log"
"os"
"regexp"
"strings"

gnmi "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
)

// InternalJtimonConfig type
type InternalJtimonConfig struct {
DataLog string `json:"data-log-file"`
CsvLog string `json:"csv-log-file"`
out *os.File
preGnmiOut *os.File
csvOut *os.File
logger *log.Logger
preGnmiLogger *log.Logger
}

type InternalJtimonPathElem struct {
Name string `json:"name"`
}

type InternalJtimonPath struct {
Elems []InternalJtimonPathElem `json:"elem"`
}

type InternalJtimonVal struct {
StringVal string `json:"string_val"`
}

type InternalJtimonUpdate struct {
Path InternalJtimonPath `json:"path"`
Val InternalJtimonVal `json:"val"`
csvLogger *log.Logger
}

func internalJtimonLogInit(jctx *JCtx) {
Expand Down Expand Up @@ -77,6 +62,10 @@ func internalJtimonLogInit(jctx *JCtx) {
log.Printf("logging in %s_pre-gnmi for %s:%d [in the format of internal jtimon tool]\n",
jctx.config.InternalJtimon.DataLog, jctx.config.Host, jctx.config.Port)
}

if *stateHandler && jctx.config.InternalJtimon.CsvLog != "" {
csvStatsLogInit(jctx)
}
}

func internalJtimonLogStop(jctx *JCtx) {
Expand All @@ -90,6 +79,9 @@ func internalJtimonLogStop(jctx *JCtx) {
jctx.config.InternalJtimon.preGnmiOut = nil
jctx.config.InternalJtimon.preGnmiLogger = nil
}
if *stateHandler && jctx.config.InternalJtimon.CsvLog != "" {
csvStatsLogStop(jctx)
}
}

func isInternalJtimonLogging(jctx *JCtx) bool {
Expand Down Expand Up @@ -216,7 +208,3 @@ func jLogInternalJtimonForPreGnmi(jctx *JCtx, ocdata *na_pb.OpenConfigData, outS
// Log here in the format of internal jtimon
jctx.config.InternalJtimon.preGnmiLogger.Printf("%s", outString)
}

func jLogUpdateOnChange(jctx *JCtx, kv map[string]string) {
return
}
162 changes: 162 additions & 0 deletions statshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package main

import (
"fmt"
gnmi_ext1 "github.com/Juniper/jtimon/gnmi/gnmi_ext"
gnmi_juniper_header_ext "github.com/Juniper/jtimon/gnmi/gnmi_juniper_header_ext"
"log"
"os"
"sync"
"time"

gnmi_pb "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
proto "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/stats"
)
Expand All @@ -20,6 +26,20 @@ type statsCtx struct {
totalInHeaderWireLength uint64
}

type kpiStats struct {
SensorName string
Path string
Streamed_path string
Component string
SequenceNumber uint64
ComponentId uint32
SubComponentId uint32
Timestamp uint64
notif_timestamp int64
re_stream_creation_timestamp uint64
re_payload_get_timestamp uint64
}

type statshandler struct {
jctx *JCtx
}
Expand Down Expand Up @@ -52,12 +72,111 @@ func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
case *stats.InPayload:
h.jctx.stats.totalInPayloadLength += uint64(s.(*stats.InPayload).Length)
h.jctx.stats.totalInPayloadWireLength += uint64(s.(*stats.InPayload).WireLength)
if *stateHandler && h.jctx.config.InternalJtimon.CsvLog != "" {
switch v := (s.(*stats.InPayload).Payload).(type) {
case *na_pb.OpenConfigData:
updateStats(h.jctx, v, false)
for idx, kv := range v.Kv {
updateStatsKV(h.jctx, false, 0)
switch kvvalue := kv.Value.(type) {
case *na_pb.KeyValue_UintValue:
if kv.Key == "__timestamp__" {
var re_c_ts uint64 = 0
var re_p_get_ts uint64 = 0
if len(v.Kv) > idx+2 {
nextKV := v.Kv[idx+1]
if nextKV.Key == "__junos_re_stream_creation_timestamp__" {
re_c_ts = nextKV.GetUintValue()
}
nextnextKV := v.Kv[idx+2]
if nextnextKV.Key == "__junos_re_payload_get_timestamp__" {
re_p_get_ts = nextnextKV.GetUintValue()
}
}

//"sensor-path", "sequence-number", "component-id", "sub-component-id", "packet-size", "p-ts", "e-ts", "re-stream-creation-ts", "re-payload-get-ts"))
h.jctx.config.InternalJtimon.csvLogger.Printf(
fmt.Sprintf("%s,%d,%d,%d,%d,%d,%d,%d,%d\n",
v.Path, v.SequenceNumber, v.ComponentId, v.SubComponentId, s.(*stats.InPayload).Length, v.Timestamp, kvvalue.UintValue, re_c_ts, re_p_get_ts))
}
}
}
case *gnmi_pb.SubscribeResponse:
stat := h.getKPIStats(v)
if stat != nil && stat.Timestamp != 0 {
path := stat.SensorName + ":" + stat.Streamed_path + ":" + stat.Path + ":" + stat.Component
h.jctx.config.InternalJtimon.csvLogger.Printf(
fmt.Sprintf("%s,%d,%d,%d,%d,%d,%d,%d,%d\n",
path, stat.SequenceNumber, stat.ComponentId, stat.SubComponentId,
s.(*stats.InPayload).Length, stat.notif_timestamp, int64(stat.Timestamp*uint64(1000000)),
int64(stat.re_stream_creation_timestamp*uint64(1000000)),
int64(stat.re_payload_get_timestamp*uint64(1000000)),
),
)
}
}
}
case *stats.InTrailer:
case *stats.End:
default:
}
}

func (h *statshandler) getKPIStats(subResponse *gnmi_pb.SubscribeResponse) *kpiStats {
var jHdrPresent bool
stats := new(kpiStats)
notfn := subResponse.GetUpdate()
if notfn == nil {
return nil
}
stats.notif_timestamp = notfn.Timestamp
extns := subResponse.GetExtension()

if extns != nil {
var extIds []gnmi_ext1.ExtensionID
for _, ext := range extns {
regExtn := ext.GetRegisteredExt()
if (regExtn.GetId()) != gnmi_ext1.ExtensionID_EID_JUNIPER_TELEMETRY_HEADER {
extIds = append(extIds, regExtn.GetId())
continue
}

jHdrPresent = true
var hdr gnmi_juniper_header_ext.GnmiJuniperTelemetryHeaderExtension
msg := regExtn.GetMsg()
err := proto.Unmarshal(msg, &hdr)
if err != nil {
log.Fatal("unmarshaling error: ", err)
}

stats.ComponentId = hdr.ComponentId
stats.SequenceNumber = hdr.SequenceNumber
stats.Path = hdr.SubscribedPath
stats.SubComponentId = hdr.SubComponentId
stats.Component = hdr.Component
stats.Streamed_path = hdr.StreamedPath
stats.SensorName = hdr.SensorName

if hdr.ExportTimestamp > 0 {
stats.Timestamp = uint64(hdr.ExportTimestamp)
}
if hdr.PayloadGetTimestamp > 0 {
stats.re_payload_get_timestamp = uint64(hdr.PayloadGetTimestamp)
}
if hdr.StreamCreationTimestamp > 0 {
stats.re_stream_creation_timestamp = uint64(hdr.StreamCreationTimestamp)
}
break
}
if !jHdrPresent {
jLog(h.jctx, fmt.Sprintf(
"Juniper header extension not present, available extensions: %v", extIds))
}
}
M-Vivek-Juniper marked this conversation as resolved.
Show resolved Hide resolved
return stats

}

func updateStats(jctx *JCtx, ocData *na_pb.OpenConfigData, needLock bool) {
if !*stateHandler {
return
Expand Down Expand Up @@ -146,3 +265,46 @@ func printSummary(jctx *JCtx) {
s += fmt.Sprintf("\n")
jLog(jctx, fmt.Sprintf("\n%s\n", s))
}

func isCsvStatsEnabled(jctx *JCtx) bool {
if *stateHandler && jctx.config.InternalJtimon.CsvLog != "" {
return true
}
return false
}

func csvStatsLogInit(jctx *JCtx) {
if !*stateHandler && jctx.config.InternalJtimon.CsvLog == "" {
return
}
var out *os.File
var err error

csvStatsFile := "csv-stats.csv"
if jctx.config.InternalJtimon.CsvLog == "" {
jctx.config.InternalJtimon.CsvLog = csvStatsFile
}

out, err = os.OpenFile(jctx.config.InternalJtimon.CsvLog, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
log.Printf("Could not create csv stats file(%s): %v\n", csvStatsFile, err)
}

if out != nil {
flags := 0

jctx.config.InternalJtimon.csvLogger = log.New(out, "", flags)
jctx.config.InternalJtimon.csvOut = out

log.Printf("Writing stats in %s for %s:%d [in csv format]\n",
jctx.config.InternalJtimon.CsvLog, jctx.config.Host, jctx.config.Port)
}
}

func csvStatsLogStop(jctx *JCtx) {
if jctx.config.InternalJtimon.csvOut != nil {
jctx.config.InternalJtimon.csvOut.Close()
jctx.config.InternalJtimon.csvOut = nil
jctx.config.InternalJtimon.csvLogger = nil
}
}
31 changes: 18 additions & 13 deletions subscribe_gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,23 +515,28 @@ func subscribegNMI(conn *grpc.ClientConn, jctx *JCtx, cfg Config, paths []PathsC
// 3. Receive rsp
go func() {
var (
rsp *gnmi.SubscribeResponse
rsp *gnmi.SubscribeResponse
err1 error
)

jLog(jctx, fmt.Sprintf("gNMI host: %v, receiving data..", hostname))
for {
rsp, err = gNMISubHandle.Recv()
if err == io.EOF {
rsp, err1 = gNMISubHandle.Recv()
if err1 == io.EOF {
printSummary(jctx)
jLog(jctx, fmt.Sprintf("gNMI host: %v, received eof", hostname))
datach <- SubRcConnRetry
return
}

if err != nil {
jLog(jctx, fmt.Sprintf("gNMI host: %v, receive response failed: %v", hostname, err))
sc, _ := status.FromError(err)

if err1 != nil {
jLog(jctx, fmt.Sprintf("gNMI host: %v, receive response failed: %v", hostname, err1))
sc, sErr := status.FromError(err)
if !sErr {
jLog(jctx, fmt.Sprintf("Failed to retrieve status from error: %v", sErr))
datach <- SubRcConnRetry
return
}
/*
* Unavailable is just a cover-up for JUNOS, ideally the device is expected to return:
* 1. Unimplemented if RPC is not available yet
Expand All @@ -547,16 +552,16 @@ func subscribegNMI(conn *grpc.ClientConn, jctx *JCtx, cfg Config, paths []PathsC
}

if *noppgoroutines {
err = gnmiHandleResponse(jctx, rsp)
if err != nil && strings.Contains(err.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, err))
gnmiErr := gnmiHandleResponse(jctx, rsp)
if gnmiErr != nil && strings.Contains(gnmiErr.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, gnmiErr))
continue
}
} else {
go func() {
err = gnmiHandleResponse(jctx, rsp)
if err != nil && strings.Contains(err.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, err))
gnmiErr1 := gnmiHandleResponse(jctx, rsp)
if gnmiErr1 != nil && strings.Contains(gnmiErr1.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, gnmiErr1))
}
}()
}
Expand Down
5 changes: 5 additions & 0 deletions tests/data/cisco-ios-xr/config/xr-all-influx.log
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ Running config of JTIMON:
"write-per-measurement": false
},
"kafka": null,
"internal-jtimon": {
"data-log-file": "",
"csv-log-file": "",
"csv-stats": false
},
"paths": [
{
"path": "SUB_JTIMON_ALL",
Expand Down
5 changes: 5 additions & 0 deletions tests/data/cisco-ios-xr/config/xr-wdsysmon-influx.log
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ Running config of JTIMON:
"write-per-measurement": false
},
"kafka": null,
"internal-jtimon": {
"data-log-file": "",
"csv-log-file": "",
"csv-stats": false
},
"paths": [
{
"path": "sub_wdsysmon-fd",
Expand Down
Loading