Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test ringbuf #2218

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bpf/include/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ static __u64 BPF_FUNC(get_attach_cookie, void *ctx);

static long BPF_FUNC(loop, __u32 nr_loops, void *callback_fn, void *callback_ctx, __u64 flags);

static long BPF_FUNC(ringbuf_output, void *data, uint64_t size, uint64_t flags);
static long BPF_FUNC(ringbuf_output, void *ringbuf, void *data, uint64_t size, uint64_t flags);
static void BPF_FUNC(ringbuf_reserve, void *ringbuf, uint64_t size, uint64_t flags);
static void BPF_FUNC(ringbuf_submit, void *data, uint64_t flags);
static void BPF_FUNC(ringbuf_discard, void *data, uint64_t flags);
Expand Down
5 changes: 2 additions & 3 deletions bpf/lib/bpf_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ struct event {
};

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__type(key, int);
__type(value, struct event);
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 8LU * 1024LU * 1024LU); // 8MB
} tcpmon_map SEC(".maps");

#endif // __BPF_EVENT_H
2 changes: 1 addition & 1 deletion bpf/lib/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ static inline __attribute__((always_inline)) void perf_event_output_metric(void
__u32 zero = 0;
long err;

err = perf_event_output(ctx, map, flags, data, size);
err = ringbuf_output(map, data, size, 0);
if (err < 0) {
valp = map_lookup_elem(&tg_stats_map, &zero);
if (valp)
Expand Down
3 changes: 1 addition & 2 deletions bpf/test/bpf_lseek.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ test_lseek(struct sys_enter_lseek_args *ctx)
msg.common.ktime = ktime_get_ns();
msg.common.size = size;
msg.arg0 = get_smp_processor_id();
perf_event_output(ctx, &tcpmon_map, BPF_F_CURRENT_CPU, &msg,
size);
ringbuf_output(&tcpmon_map, &msg, size, 0);
}

return 0;
Expand Down
36 changes: 2 additions & 34 deletions pkg/bpf/perf_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,15 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"strings"

"golang.org/x/sys/unix"
)

const (
PossibleCPUSysfsPath = "/sys/devices/system/cpu/possible"
)

type PerfEventConfig struct {
NumCpus int
NumPages int
MapName string
Type int
Config int
SampleType int
WakeupEvents int
MapName string
}

// GetNumPossibleCPUs returns a total number of possible CPUS, i.e. CPUs that
Expand Down Expand Up @@ -74,30 +65,7 @@ func getNumPossibleCPUsFromReader(r io.Reader) int {
// DefaultPerfEventConfig returns the default perf event configuration. It
// relies on the map root to be set.
func DefaultPerfEventConfig() *PerfEventConfig {
numCpus := GetNumPossibleCPUs()
if numCpus == 0 {
numCpus = runtime.NumCPU()
}
return &PerfEventConfig{
MapName: filepath.Join(MapPrefixPath(), eventsMapName),
Type: PERF_TYPE_SOFTWARE,
Config: PERF_COUNT_SW_BPF_OUTPUT,
SampleType: PERF_SAMPLE_RAW,
WakeupEvents: 1,
NumCpus: numCpus,
NumPages: 128,
}
}

func UpdateElementFromPointers(fd int, structPtr, sizeOfStruct uintptr) error {
ret, _, err := unix.Syscall(
unix.SYS_BPF,
BPF_MAP_UPDATE_ELEM,
structPtr,
sizeOfStruct,
)
if ret != 0 || err != 0 {
return fmt.Errorf("Unable to update element for map with file descriptor %d: %s", fd, err)
MapName: filepath.Join(MapPrefixPath(), eventsMapName),
}
return nil
}
61 changes: 6 additions & 55 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"fmt"
"math"
"os"
"runtime"
"strings"
Expand All @@ -16,7 +15,7 @@ import (
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/tetragon/pkg/api/ops"
"github.com/cilium/tetragon/pkg/api/readyapi"
"github.com/cilium/tetragon/pkg/bpf"
Expand All @@ -33,10 +32,6 @@ import (
"github.com/sirupsen/logrus"
)

const (
perCPUBufferBytes = 65535
)

var (
eventHandler = make(map[uint8]func(r *bytes.Reader) ([]Event, error))

Expand Down Expand Up @@ -144,23 +139,6 @@ func (k *Observer) receiveEvent(data []byte) {
}
}

// Gets final size for single perf ring buffer rounded from
// passed size argument (kindly borrowed from ebpf/cilium)
func perfBufferSize(perCPUBuffer int) int {
pageSize := os.Getpagesize()

// Smallest whole number of pages
nPages := (perCPUBuffer + pageSize - 1) / pageSize

// Round up to nearest power of two number of pages
nPages = int(math.Pow(2, math.Ceil(math.Log2(float64(nPages)))))

// Add one for metadata
nPages++

return nPages * pageSize
}

func sizeWithSuffix(size int) string {
suffix := [4]string{"", "K", "M", "G"}

Expand All @@ -173,26 +151,6 @@ func sizeWithSuffix(size int) string {
return fmt.Sprintf("%d%s", size, suffix[i])
}

func (k *Observer) getRBSize(cpus int) int {
var size int

if option.Config.RBSize == 0 && option.Config.RBSizeTotal == 0 {
size = perCPUBufferBytes
} else if option.Config.RBSize != 0 {
size = option.Config.RBSize
} else {
size = option.Config.RBSizeTotal / int(cpus)
}

cpuSize := perfBufferSize(size)
totalSize := cpuSize * cpus

k.log.WithField("percpu", sizeWithSuffix(cpuSize)).
WithField("total", sizeWithSuffix(totalSize)).
Info("Perf ring buffer size (bytes)")
return size
}

func (k *Observer) getRBQueueSize() int {
size := option.Config.RBQueueSize
if size == 0 {
Expand All @@ -211,11 +169,9 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
}
defer perfMap.Close()

rbSize := k.getRBSize(int(perfMap.MaxEntries()))
perfReader, err := perf.NewReader(perfMap, rbSize)

rd, err := ringbuf.NewReader(perfMap)
if err != nil {
return fmt.Errorf("creating perf array reader failed: %w", err)
return fmt.Errorf("opening ringbuf reader: %w", err)
}

// Inform caller that we're about to start processing events.
Expand All @@ -224,7 +180,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {

// We spawn go routine to read and process perf events,
// connected with main app through eventsQueue channel.
eventsQueue := make(chan *perf.Record, k.getRBQueueSize())
eventsQueue := make(chan *ringbuf.Record, k.getRBQueueSize())

// Listeners are ready and about to start reading from perf reader, tell
// user everything is ready.
Expand All @@ -237,7 +193,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
go func() {
defer wg.Done()
for stopCtx.Err() == nil {
record, err := perfReader.Read()
record, err := rd.Read()
if err != nil {
// NOTE(JM and Djalal): count and log errors while excluding the stopping context
if stopCtx.Err() == nil {
Expand All @@ -256,11 +212,6 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
k.recvCntr++
ringbufmetrics.PerfEventReceived.Inc()
}

if record.LostSamples > 0 {
atomic.AddUint64(&k.lostCntr, uint64(record.LostSamples))
ringbufmetrics.PerfEventLost.Add(float64(record.LostSamples))
}
}
}
}()
Expand Down Expand Up @@ -290,7 +241,7 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {

// Wait for context to be cancelled and then stop.
<-stopCtx.Done()
return perfReader.Close()
return rd.Close()
}

// Observer represents the link between the BPF perf ring and the listeners. It
Expand Down
10 changes: 5 additions & 5 deletions pkg/testutils/perfring/perfring.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"testing"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/tetragon/pkg/bpf"
testapi "github.com/cilium/tetragon/pkg/grpc/test"
"github.com/cilium/tetragon/pkg/logger"
Expand Down Expand Up @@ -69,7 +69,7 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted
}
defer perfMap.Close()

perfReader, err := perf.NewReader(perfMap, 65535)
rd, err := ringbuf.NewReader(perfMap)
if err != nil {
t.Fatalf("creating perf array reader failed: %v", err)
}
Expand All @@ -96,7 +96,7 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted
break
}

record, err := perfReader.Read()
record, err := rd.Read()
if err != nil {
if ctx.Err() == nil {
errChan <- fmt.Errorf("error reading perfring data: %v", err)
Expand Down Expand Up @@ -127,11 +127,11 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted
case err := <-errChan:
t.Fatal(err)
case <-complChan:
perfReader.Close()
rd.Close()
return
case <-ctx.Done():
// Wait for context cancel.
perfReader.Close()
rd.Close()
return
}
}
Expand Down
5 changes: 0 additions & 5 deletions vendor/github.com/cilium/ebpf/perf/doc.go

This file was deleted.

Loading
Loading