From 2501f75150ff68f8d995096b1ccd25768a45730e Mon Sep 17 00:00:00 2001 From: Seonghoi Lee Date: Mon, 20 May 2024 10:55:50 +0900 Subject: [PATCH] feat: AddRingBuf() using ring_buffer__add libbpf provides ring_buffer__add to handle multiple ringbuf event callbacks - single RingBuffer object now handles multiple slots - implements AddRingBuf() for ring_buffer__add wrapper - updates ringbuffers selftest to show how to handle it Reference - https://github.com/torvalds/linux/blob/eb6a9339efeb6f3d2b5c86fdf2382cdc293eca2c/tools/testing/selftests/bpf/progs/test_ringbuf_multi.c - https://github.com/torvalds/linux/blob/eb6a9339efeb6f3d2b5c86fdf2382cdc293eca2c/tools/testing/selftests/bpf/prog_tests/ringbuf_multi.c#41 --- buf-ring.go | 27 ++++++++++++------- libbpfgo.c | 11 ++++++++ libbpfgo.h | 1 + module.go | 26 ++++++++++++++++++- selftest/ringbuffers/main.bpf.c | 13 ++++++++-- selftest/ringbuffers/main.go | 46 +++++++++++++++++++++++---------- 6 files changed, 98 insertions(+), 26 deletions(-) diff --git a/buf-ring.go b/buf-ring.go index 8e83b304..02b2ce11 100644 --- a/buf-ring.go +++ b/buf-ring.go @@ -19,7 +19,7 @@ import ( type RingBuffer struct { rb *C.struct_ring_buffer bpfMap *BPFMap - slot uint + slots []uint stop chan struct{} closed bool wg sync.WaitGroup @@ -50,20 +50,25 @@ func (rb *RingBuffer) Stop() { // may have stopped at this point. Failure to drain it will // result in a deadlock: the channel will fill up and the poll // goroutine will block in the callback. - eventChan := eventChannels.get(rb.slot).(chan []byte) - go func() { - // revive:disable:empty-block - for range eventChan { - } - // revive:enable:empty-block - }() + for _, slot := range rb.slots { + eventChan := eventChannels.get(slot).(chan []byte) + go func() { + // revive:disable:empty-block + for range eventChan { + } + // revive:enable:empty-block + }() + } // Wait for the poll goroutine to exit rb.wg.Wait() // Close the channel -- this is useful for the consumer but // also to terminate the drain goroutine above. - close(eventChan) + for _, slot := range rb.slots { + eventChan := eventChannels.get(slot).(chan []byte) + close(eventChan) + } // Reset pb.stop to allow multiple safe calls to Stop() rb.stop = nil @@ -76,7 +81,9 @@ func (rb *RingBuffer) Close() { rb.Stop() C.ring_buffer__free(rb.rb) - eventChannels.remove(rb.slot) + for _, slot := range rb.slots { + eventChannels.remove(slot) + } rb.closed = true } diff --git a/libbpfgo.c b/libbpfgo.c index 053711e1..3ba6dfa4 100644 --- a/libbpfgo.c +++ b/libbpfgo.c @@ -59,6 +59,17 @@ struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx) return rb; } +int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx) +{ + int ret = ring_buffer__add(rb, map_fd, ringbufferCallback, (void *) ctx); + if (ret != 0) { + fprintf(stderr, "Failed to add ring buffer: %s\n", strerror(-ret)); + return ret; + } + + return ret; +} + struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx) { struct perf_buffer_opts pb_opts = {}; diff --git a/libbpfgo.h b/libbpfgo.h index de936ec4..ac11de18 100644 --- a/libbpfgo.h +++ b/libbpfgo.h @@ -21,6 +21,7 @@ void cgo_libbpf_set_print_fn(); struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx); +int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx); struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx); void cgo_bpf_map__initial_value(struct bpf_map *map, void *value); diff --git a/module.go b/module.go index 35ae6509..e12778da 100644 --- a/module.go +++ b/module.go @@ -336,18 +336,42 @@ func (m *Module) InitRingBuf(mapName string, eventsChan chan []byte) (*RingBuffe rbC, errno := C.cgo_init_ring_buf(C.int(bpfMap.FileDescriptor()), C.uintptr_t(slot)) if rbC == nil { + eventChannels.remove(uint(slot)) return nil, fmt.Errorf("failed to initialize ring buffer: %w", errno) } ringBuf := &RingBuffer{ rb: rbC, bpfMap: bpfMap, - slot: uint(slot), + slots: []uint{uint(slot)}, } m.ringBufs = append(m.ringBufs, ringBuf) return ringBuf, nil } +func (m *Module) AddRingBuf(ringBuf *RingBuffer, mapName string, eventsChan chan []byte) (bool, error) { + bpfMap, err := m.GetMap(mapName) + if err != nil { + return false, err + } + + if eventsChan == nil { + return false, fmt.Errorf("events channel can not be nil") + } + + slot := eventChannels.put(eventsChan) + if slot == -1 { + return false, fmt.Errorf("max ring buffers reached") + } + ringBuf.slots = append(ringBuf.slots, uint(slot)) + + ret, errno := C.cgo_add_ring_buf(ringBuf.rb, C.int(bpfMap.FileDescriptor()), C.uintptr_t(slot)) + if ret != 0 { + return false, fmt.Errorf("failed to add ring buffer: %w", errno) + } + return true, nil +} + func (m *Module) InitPerfBuf(mapName string, eventsChan chan []byte, lostChan chan uint64, pageCnt int) (*PerfBuffer, error) { bpfMap, err := m.GetMap(mapName) if err != nil { diff --git a/selftest/ringbuffers/main.bpf.c b/selftest/ringbuffers/main.bpf.c index 0c8fab5e..32c848da 100644 --- a/selftest/ringbuffers/main.bpf.c +++ b/selftest/ringbuffers/main.bpf.c @@ -8,7 +8,7 @@ struct { __uint(type, BPF_MAP_TYPE_RINGBUF); __uint(max_entries, 1 << 24); -} events SEC(".maps"); +} events1 SEC(".maps"), events2 SEC(".maps"); long ringbuffer_flags = 0; @@ -18,13 +18,22 @@ int kprobe__sys_mmap(struct pt_regs *ctx) int *process; // Reserve space on the ringbuffer for the sample - process = bpf_ringbuf_reserve(&events, sizeof(int), ringbuffer_flags); + process = bpf_ringbuf_reserve(&events1, sizeof(int), ringbuffer_flags); if (!process) { return 0; } *process = 2021; + bpf_ringbuf_submit(process, ringbuffer_flags); + + process = bpf_ringbuf_reserve(&events2, sizeof(int), ringbuffer_flags); + if (!process) { + return 0; + } + + *process = 2024; + bpf_ringbuf_submit(process, ringbuffer_flags); return 0; } diff --git a/selftest/ringbuffers/main.go b/selftest/ringbuffers/main.go index cef4a97e..893bf165 100644 --- a/selftest/ringbuffers/main.go +++ b/selftest/ringbuffers/main.go @@ -15,7 +15,7 @@ import ( ) func resizeMap(module *bpf.Module, name string, size uint32) error { - m, err := module.GetMap("events") + m, err := module.GetMap(name) if err != nil { return err } @@ -39,7 +39,7 @@ func main() { } defer bpfModule.Close() - if err = resizeMap(bpfModule, "events", 8192); err != nil { + if err = resizeMap(bpfModule, "events1", 8192); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(-1) } @@ -58,32 +58,52 @@ func main() { os.Exit(-1) } - eventsChannel := make(chan []byte) - rb, err := bpfModule.InitRingBuf("events", eventsChannel) + eventsChannel1 := make(chan []byte) + rb, err := bpfModule.InitRingBuf("events1", eventsChannel1) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(-1) } + eventsChannel2 := make(chan []byte) + ret, err := bpfModule.AddRingBuf(rb, "events2", eventsChannel2) + if ret == false { + fmt.Fprintln(os.Stderr, err) + os.Exit(-1) + } + rb.Poll(300) - numberOfEventsReceived := 0 + numberOfEvent1Received := 0 + numberOfEvent2Received := 0 go func() { for { syscall.Mmap(999, 999, 999, 1, 1) time.Sleep(time.Second / 2) } }() + recvLoop: for { - b := <-eventsChannel - if binary.LittleEndian.Uint32(b) != 2021 { - fmt.Fprintf(os.Stderr, "invalid data retrieved\n") - os.Exit(-1) - } - numberOfEventsReceived++ - if numberOfEventsReceived > 5 { - break recvLoop + select { + case b := <-eventsChannel1: + if binary.LittleEndian.Uint32(b) != 2021 { + fmt.Fprintf(os.Stderr, "invalid data retrieved\n") + os.Exit(-1) + } + numberOfEvent1Received++ + if numberOfEvent1Received > 5 && numberOfEvent2Received > 5 { + break recvLoop + } + case b := <-eventsChannel2: + if binary.LittleEndian.Uint32(b) != 2024 { + fmt.Fprintf(os.Stderr, "invalid data retrieved\n") + os.Exit(-1) + } + numberOfEvent2Received++ + if numberOfEvent1Received > 5 && numberOfEvent2Received > 5 { + break recvLoop + } } }