Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use informer code from beyla-k8s-cache #1256

Merged
merged 29 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4f74243
simplifying informer and use k8s-cache one
mariomac Oct 11, 2024
2df7f2d
Going on with refactoring
mariomac Oct 12, 2024
b540435
fix k8s test
mariomac Oct 12, 2024
04bd355
changes compiling despite not passing unit tests
mariomac Oct 12, 2024
2847945
passed watcher_kube_test
mariomac Oct 12, 2024
4df5a2a
Unit tests passing
mariomac Oct 13, 2024
96a60bc
some fixes
mariomac Oct 14, 2024
807bf5f
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument
mariomac Oct 14, 2024
11a4ac5
Fixed race condition in k8s initialization code
mariomac Oct 14, 2024
8231b54
Fix formatting
mariomac Oct 14, 2024
a87d83a
Fix linting
mariomac Oct 14, 2024
5ee6816
Fix linting and some race conditions in tests
mariomac Oct 14, 2024
3fc9f68
fix linting
mariomac Oct 14, 2024
f141db5
Fix goimports-reviser
mariomac Oct 14, 2024
ce02bed
fix checkfmt
mariomac Oct 14, 2024
c89d80b
amend resplicaset testing
mariomac Oct 14, 2024
034ce03
Improve watcher kube enricher test
mariomac Oct 15, 2024
884bb84
temptative fix to a race condition that prevented kube finder pipelin…
mariomac Oct 15, 2024
54d72da
Fixed watcherkubeenricher
mariomac Oct 16, 2024
37d5a62
Reverted go.mod replacement
mariomac Oct 16, 2024
8491bb2
fix compilation error
mariomac Oct 16, 2024
26d3ee2
fixed all tests
mariomac Oct 16, 2024
a0b5cac
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument
mariomac Oct 16, 2024
e032873
Implemented multiple owners
mariomac Oct 16, 2024
8bccba4
reworked code and added some comments
mariomac Oct 16, 2024
cfa2466
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into si…
mariomac Oct 16, 2024
b990dfd
update vendor
mariomac Oct 16, 2024
0b710ce
Fix wrong context cancellation
mariomac Oct 16, 2024
dd27d9e
Adding rafael's comments
mariomac Oct 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ KIND = $(TOOLS_DIR)/kind
DASHBOARD_LINTER = $(TOOLS_DIR)/dashboard-linter
GINKGO = $(TOOLS_DIR)/ginkgo

GOIMPORTS_REVISER_ARGS = -company-prefixes github.com/grafana -project-name github.com/grafana/beyla/

define check_format
$(shell $(foreach FILE, $(shell find . -name "*.go" -not -path "**/vendor/*"), \
$(GOIMPORTS_REVISER) -company-prefixes github.com/grafana -list-diff -output stdout $(FILE);))
$(GOIMPORTS_REVISER) $(GOIMPORTS_REVISER_ARGS) -list-diff -output stdout $(FILE);))
endef


Expand Down Expand Up @@ -121,7 +123,7 @@ prereqs: install-hooks
fmt: prereqs
@echo "### Formatting code and fixing imports"
@$(foreach FILE, $(shell find . -name "*.go" -not -path "**/vendor/*"), \
$(GOIMPORTS_REVISER) -company-prefixes github.com/grafana $(FILE);)
$(GOIMPORTS_REVISER) $(GOIMPORTS_REVISER_ARGS) $(FILE);)

.PHONY: checkfmt
checkfmt:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/grafana/beyla-k8s-cache v0.0.0-20241016170428-8b4efb83c725
github.com/grafana/go-offsets-tracker v0.1.7
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mariomac/guara v0.0.0-20230621100729-42bd7716e524
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
sigs.k8s.io/e2e-framework v0.3.0
Expand Down Expand Up @@ -171,9 +171,9 @@ require (
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/api v0.31.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/beyla-k8s-cache v0.0.0-20241016170428-8b4efb83c725 h1:auLp/060bWP2uWVKH3MIAzL61ttGujsNXry6NI8Un/M=
github.com/grafana/beyla-k8s-cache v0.0.0-20241016170428-8b4efb83c725/go.mod h1:+Y9gUSD7rptXXZ0OL0aRcBLAgzBm+nE4OH1QB9LqHYc=
github.com/grafana/go-offsets-tracker v0.1.7 h1:2zBQ7iiGzvyXY7LA8kaaSiEqH/Yx82UcfRabbY5aOG4=
github.com/grafana/go-offsets-tracker v0.1.7/go.mod h1:qcQdu7zlUKIFNUdBJlLyNHuJGW0SKWKjkrN6jtt+jds=
github.com/grafana/opentelemetry-go v1.28.0-grafana.3 h1:vExZiZKDZTdDi7fP1GG3GOGuoZ0GNu76408tNXfsnD0=
Expand Down Expand Up @@ -435,8 +437,6 @@ google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWn
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
5 changes: 0 additions & 5 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ func RunBeyla(ctx context.Context, cfg *beyla.Config) error {

func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) error {
slog.Info("starting Beyla in Application Observability mode")
// TODO: when we split Beyla in two processes with different permissions, this code can be split:
// in two parts:
// 1st process (privileged) - Invoke FindTarget, which also mounts the BPF maps
// 2nd executable (unprivileged) - Invoke ReadAndForward, receiving the BPF map mountpoint as argument

instr := appolly.New(ctx, ctxInfo, config)
if err := instr.FindAndInstrument(); err != nil {
return fmt.Errorf("can't find target process: %w", err)
Expand Down
14 changes: 6 additions & 8 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/transform/kube"
)

func log() *slog.Logger {
Expand Down Expand Up @@ -124,17 +123,16 @@ func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo) {
return
}

informer, err := ctxInfo.K8sInformer.Get(ctx)
if err != nil {
if err := refreshK8sInformerCache(ctx, ctxInfo); err != nil {
slog.Error("can't init Kubernetes informer. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.K8sInformer.ForceDisable()
return
}
}

if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(informer); err != nil {
slog.Error("can't setup Kubernetes database. Your traces won't be decorated with Kubernetes metadata",
"error", err)
ctxInfo.K8sInformer.ForceDisable()
}
func refreshK8sInformerCache(ctx context.Context, ctxInfo *global.ContextInfo) error {
// force the cache to be populated and cached
_, err := ctxInfo.K8sInformer.Get(ctx)
return err
}
3 changes: 2 additions & 1 deletion pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (ta *TraceAttacher) attacherLoop() (pipe.FinalFunc[[]Event[ebpf.Instrumenta
mainLoop:
for instrumentables := range in {
for _, instr := range instrumentables {
ta.log.Debug("Instrumentable", "len", len(instrumentables), "inst", instr)
ta.log.Debug("Instrumentable", "created", instr.Type, "type", instr.Obj.Type,
"exec", instr.Obj.FileInfo.CmdExePath, "pid", instr.Obj.FileInfo.Pid)
switch instr.Type {
case EventCreated:
ta.processInstances.Inc(instr.Obj.FileInfo.Ino)
Expand Down
22 changes: 15 additions & 7 deletions pkg/internal/discover/container_updater.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,45 @@
package discover

import (
"context"
"fmt"
"log/slog"

"github.com/mariomac/pipes/pipe"

"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/transform/kube"
"github.com/grafana/beyla/pkg/internal/kube"
)

// ContainerDBUpdaterProvider is a stage in the Process Finder pipeline that will be
// enabled only if Kubernetes decoration is enabled.
// It just updates part of the kubernetes database when a new process is discovered.
func ContainerDBUpdaterProvider(enabled bool, db *kube.Database) pipe.MiddleProvider[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
func ContainerDBUpdaterProvider(ctx context.Context, meta kubeMetadataProvider) pipe.MiddleProvider[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
return func() (pipe.MiddleFunc[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]], error) {
if !enabled {
if !meta.IsKubeEnabled() {
return pipe.Bypass[[]Event[ebpf.Instrumentable]](), nil
}
return updateLoop(db), nil
store, err := meta.Get(ctx)
if err != nil {
return nil, fmt.Errorf("instantiating ContainerDBUpdater: %w", err)
}
return updateLoop(store), nil
}
}

func updateLoop(db *kube.Database) pipe.MiddleFunc[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
func updateLoop(db *kube.Store) pipe.MiddleFunc[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
log := slog.With("component", "ContainerDBUpdater")
return func(in <-chan []Event[ebpf.Instrumentable], out chan<- []Event[ebpf.Instrumentable]) {
for instrumentables := range in {
for i := range instrumentables {
ev := &instrumentables[i]
switch ev.Type {
case EventCreated:
log.Debug("adding process", "pid", ev.Obj.FileInfo.Pid)
db.AddProcess(uint32(ev.Obj.FileInfo.Pid))
case EventDeleted:
// we don't need to handle process deletion from here, as the Kubernetes informer will
// remove the process from the database when the Pod that contains it is deleted.
// However we clean-up the performance related caches, in case we miss pod removal event
db.CleanProcessCaches(ev.Obj.FileInfo.Ns)
}
}
out <- instrumentables
Expand Down
3 changes: 1 addition & 2 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.Instrumentable, <-chan *ebpf.Inst
WatcherKubeEnricherProvider(pf.ctx, pf.ctxInfo.K8sInformer))
pipe.AddMiddleProvider(gb, criteriaMatcher, CriteriaMatcherProvider(pf.cfg))
pipe.AddMiddleProvider(gb, execTyper, ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics))
pipe.AddMiddleProvider(gb, containerDBUpdater,
ContainerDBUpdaterProvider(pf.ctxInfo.K8sInformer.IsKubeEnabled(), pf.ctxInfo.AppO11y.K8sDatabase))
pipe.AddMiddleProvider(gb, containerDBUpdater, ContainerDBUpdaterProvider(pf.ctx, pf.ctxInfo.K8sInformer))
pipe.AddFinalProvider(gb, traceAttacher, TraceAttacherProvider(&TraceAttacher{
Cfg: pf.cfg,
Ctx: pf.ctx,
Expand Down
Loading
Loading