Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tui): Enhancing the output of watch and stat commands #59

Merged
merged 12 commits into from
Oct 19, 2024
34 changes: 19 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"kyanos/agent/compatible"
"kyanos/agent/conn"
"kyanos/agent/protocol"
"kyanos/agent/render"
"kyanos/agent/render/stat"
"kyanos/agent/render/watch"
"kyanos/bpf"
"kyanos/bpf/loader"
"kyanos/common"
"os"
"os/signal"
"syscall"
"time"

"github.com/cilium/ebpf/rlimit"
)
Expand All @@ -40,16 +40,7 @@ func SetupAgent(options ac.AgentOptions) {
statRecorder := analysis.InitStatRecorder()

var recordsChannel chan *anc.AnnotatedRecord = nil
if options.AnalysisEnable {
recordsChannel = make(chan *anc.AnnotatedRecord, 1000)
resultChannel := make(chan []*analysis.ConnStat, 1000)
renderStopper := make(chan int)
analyzer := analysis.CreateAnalyzer(recordsChannel, &options.AnalysisOptions, resultChannel, renderStopper, options.Ctx)
go analyzer.Run()

render := render.CreateRender(resultChannel, renderStopper, analyzer.AnalysisOptions)
go render.Run()
}
recordsChannel = make(chan *anc.AnnotatedRecord, 1000)

pm := conn.InitProcessorManager(options.ProcessorsNum, connManager, options.MessageFilter, options.LatencyFilter, options.SizeFilter, options.TraceSide)
conn.RecordFunc = func(r protocol.Record, c *conn.Connection4) error {
Expand Down Expand Up @@ -107,9 +98,22 @@ func SetupAgent(options ac.AgentOptions) {
if options.InitCompletedHook != nil {
options.InitCompletedHook()
}
for !stop {
time.Sleep(time.Second * 1)

if options.AnalysisEnable {
resultChannel := make(chan []*analysis.ConnStat, 1000)
renderStopper := make(chan int)
analyzer := analysis.CreateAnalyzer(recordsChannel, &options.AnalysisOptions, resultChannel, renderStopper, options.Ctx)
go analyzer.Run()
stat.StartStatRender(ctx, resultChannel, options.AnalysisOptions)
// render := render.CreateRender(resultChannel, renderStopper, analyzer.AnalysisOptions)
// go render.Run()
// for !stop {
// time.Sleep(time.Second * 1)
// }
} else {
watch.RunWatchRender(ctx, recordsChannel, options.WatchOptions)
}
common.AgentLog.Infoln("Kyanos Stopped")
common.AgentLog.Infoln("Kyanos Stopped: ", stop)

return
}
49 changes: 32 additions & 17 deletions agent/analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ type aggregator struct {
}

func createAggregatorWithHumanReadableClassId(humanReadableClassId string,
classId ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
aggregator := createAggregator(classId, aggregateOption)
aggregator.HumanReadbleClassId = humanReadableClassId
return aggregator
}

func createAggregator(classId ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
func createAggregator(classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
aggregator := aggregator{}
aggregator.reset(classId, aggregateOption)
return &aggregator
}

func (a *aggregator) reset(classId ClassId, aggregateOption *analysis_common.AnalysisOptions) {
func (a *aggregator) reset(classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions) {
a.AnalysisOptions = aggregateOption
a.ConnStat = &ConnStat{
ClassId: classId,
Expand Down Expand Up @@ -104,50 +104,63 @@ type Analyzer struct {
Classfier
*analysis_common.AnalysisOptions
common.SideEnum // 那一边的统计指标TODO 根据参数自动推断
Aggregators map[ClassId]*aggregator
Aggregators map[analysis_common.ClassId]*aggregator
recordsChannel <-chan *analysis_common.AnnotatedRecord
stopper <-chan int
resultChannel chan<- []*ConnStat
renderStopper chan int
ticker *time.Ticker
tickerC <-chan time.Time
ctx context.Context
recordReceived int
}

func CreateAnalyzer(recordsChannel <-chan *analysis_common.AnnotatedRecord, showOption *analysis_common.AnalysisOptions, resultChannel chan<- []*ConnStat, renderStopper chan int, ctx context.Context) *Analyzer {
func CreateAnalyzer(recordsChannel <-chan *analysis_common.AnnotatedRecord, opts *analysis_common.AnalysisOptions, resultChannel chan<- []*ConnStat, renderStopper chan int, ctx context.Context) *Analyzer {
stopper := make(chan int)
// ac.AddToFastStopper(stopper)
opts.Init()
analyzer := &Analyzer{
Classfier: getClassfier(showOption.ClassfierType),
Classfier: getClassfier(opts.ClassfierType),
recordsChannel: recordsChannel,
Aggregators: make(map[ClassId]*aggregator),
AnalysisOptions: showOption,
Aggregators: make(map[analysis_common.ClassId]*aggregator),
AnalysisOptions: opts,
stopper: stopper,
resultChannel: resultChannel,
renderStopper: renderStopper,
ctx: ctx,
}
if showOption.Interval > 0 {
analyzer.ticker = time.NewTicker(time.Second * time.Duration(showOption.Interval))
analyzer.tickerC = analyzer.ticker.C
} else {
opts.CurrentReceivedSamples = func() int {
return analyzer.recordReceived
}
if analyzer.AnalysisOptions.EnableBatchModel() {
analyzer.tickerC = make(<-chan time.Time)
} else {
analyzer.ticker = time.NewTicker(time.Second * 1)
analyzer.tickerC = analyzer.ticker.C
}
return analyzer
}

func (a *Analyzer) Run() {
defer close(a.resultChannel)
for {
select {
// case <-a.stopper:
case <-a.ctx.Done():
if a.AnalysisOptions.Interval == 0 {
a.resultChannel <- a.harvest()
time.Sleep(1 * time.Second)
}
a.renderStopper <- 1
return
case record := <-a.recordsChannel:
a.analyze(record)
a.recordReceived++
if a.EnableBatchModel() && a.recordReceived == a.TargetSamples {
a.resultChannel <- a.harvest()
return
}
case <-a.AnalysisOptions.HavestSignal:
a.resultChannel <- a.harvest()
if a.AnalysisOptions.EnableBatchModel() {
return
}
case <-a.tickerC:
a.resultChannel <- a.harvest()
}
Expand All @@ -161,7 +174,9 @@ func (a *Analyzer) harvest() []*ConnStat {
// aggregator.reset(classId, a.analysis_common.AnalysisOptions)
result = append(result, connstat)
}
a.Aggregators = make(map[ClassId]*aggregator)
if a.AnalysisOptions.CleanWhenHarvest {
a.Aggregators = make(map[analysis_common.ClassId]*aggregator)
}
return result
}

Expand Down
58 changes: 18 additions & 40 deletions agent/analysis/classfier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,34 @@ import (
"kyanos/bpf"
)

type Classfier func(*anc.AnnotatedRecord) (ClassId, error)
type Classfier func(*anc.AnnotatedRecord) (anc.ClassId, error)
type ClassIdAsHumanReadable func(*anc.AnnotatedRecord) string

var ClassfierTypeNames = map[anc.ClassfierType]string{
None: "none",
Conn: "conn",
RemotePort: "remote-port",
LocalPort: "local-port",
RemoteIp: "remote-ip",
Protocol: "protocol",
HttpPath: "http-path",
RedisCommand: "redis-command",
}

const (
None anc.ClassfierType = iota
Conn
RemotePort
LocalPort
RemoteIp
Protocol

// Http
HttpPath

// Redis
RedisCommand
)

type ClassId string

var classfierMap map[anc.ClassfierType]Classfier
var classIdHumanReadableMap map[anc.ClassfierType]ClassIdAsHumanReadable

func init() {
classfierMap = make(map[anc.ClassfierType]Classfier)
classfierMap[None] = func(ar *anc.AnnotatedRecord) (ClassId, error) { return "none", nil }
classfierMap[Conn] = func(ar *anc.AnnotatedRecord) (ClassId, error) {
return ClassId(ar.ConnDesc.Identity()), nil
classfierMap[anc.None] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) { return "none", nil }
classfierMap[anc.Conn] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
return anc.ClassId(ar.ConnDesc.Identity()), nil
}
classfierMap[anc.RemotePort] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
return anc.ClassId(fmt.Sprintf("%d", ar.RemotePort)), nil
}
classfierMap[anc.LocalPort] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
return anc.ClassId(fmt.Sprintf("%d", ar.LocalPort)), nil
}
classfierMap[anc.RemoteIp] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) { return anc.ClassId(ar.RemoteAddr.String()), nil }
classfierMap[anc.Protocol] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
return anc.ClassId(fmt.Sprintf("%d", ar.Protocol)), nil
}
classfierMap[RemotePort] = func(ar *anc.AnnotatedRecord) (ClassId, error) { return ClassId(fmt.Sprintf("%d", ar.RemotePort)), nil }
classfierMap[LocalPort] = func(ar *anc.AnnotatedRecord) (ClassId, error) { return ClassId(fmt.Sprintf("%d", ar.LocalPort)), nil }
classfierMap[RemoteIp] = func(ar *anc.AnnotatedRecord) (ClassId, error) { return ClassId(ar.RemoteAddr.String()), nil }
classfierMap[Protocol] = func(ar *anc.AnnotatedRecord) (ClassId, error) { return ClassId(fmt.Sprintf("%d", ar.Protocol)), nil }

classIdHumanReadableMap = make(map[anc.ClassfierType]ClassIdAsHumanReadable)
classIdHumanReadableMap[Conn] = func(ar *anc.AnnotatedRecord) string {
return ar.ConnDesc.String()
classIdHumanReadableMap[anc.Conn] = func(ar *anc.AnnotatedRecord) string {
return ar.ConnDesc.SimpleString()
}
classIdHumanReadableMap[HttpPath] = func(ar *anc.AnnotatedRecord) string {
classIdHumanReadableMap[anc.HttpPath] = func(ar *anc.AnnotatedRecord) string {
httpReq, ok := ar.Record.Request().(*protocol.ParsedHttpRequest)
if !ok {
return "__not_a_http_req__"
Expand All @@ -66,7 +44,7 @@ func init() {
}
}

classIdHumanReadableMap[Protocol] = func(ar *anc.AnnotatedRecord) string {
classIdHumanReadableMap[anc.Protocol] = func(ar *anc.AnnotatedRecord) string {
return bpf.ProtocolNamesMap[bpf.AgentTrafficProtocolT(ar.Protocol)]
}
}
Expand Down
31 changes: 31 additions & 0 deletions agent/analysis/common/classfier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package common

var ClassfierTypeNames = map[ClassfierType]string{
None: "none",
Conn: "conn",
RemotePort: "remote-port",
LocalPort: "local-port",
RemoteIp: "remote-ip",
Protocol: "protocol",
HttpPath: "http-path",
RedisCommand: "redis-command",
Default: "default",
}

const (
Default ClassfierType = iota
None
Conn
RemotePort
LocalPort
RemoteIp
Protocol

// Http
HttpPath

// Redis
RedisCommand
)

type ClassId string
Loading
Loading