Skip to content

Commit

Permalink
lock for observed interface update
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Jan 16, 2025
1 parent cf22ec7 commit 0921905
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 40 deletions.
87 changes: 47 additions & 40 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
to/from an interface.
Logic:
1) Store flow information in a per-cpu hash map.
2) Upon flow completion (tcp->fin event), evict the entry from map, and
send to userspace through ringbuffer.
Eviction for non-tcp flows need to done by userspace
3) When the map is full, we send the new flow entry to userspace via ringbuffer,
1) Store flow information in a hash map.
2) Periodically evict the entry from map from userspace.
3) When the map is full/busy, we send the new flow entry to userspace via ringbuffer,
until an entry is available.
4) When hash collision is detected, we send the new entry to userpace via ringbuffer.
*/
#include <vmlinux.h>
#include <bpf_helpers.h>
Expand Down Expand Up @@ -55,16 +52,47 @@
*/
#include "pkt_translation.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len,
u32 sampling) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
bpf_spin_unlock(&aggregate_flow->lock);
static inline void add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index,
u8 direction) {
u8 nb_observed_intf = value->nb_observed_intf;
if (nb_observed_intf < MAX_OBSERVED_INTERFACES) {
for (u8 i = 0; i < nb_observed_intf; i++) {
if (value->observed_intf[i].if_index == if_index &&
value->observed_intf[i].direction == direction) {
return;
}
}
bpf_spin_lock(&value->lock);
value->observed_intf[nb_observed_intf].if_index = if_index;
value->observed_intf[nb_observed_intf].direction = direction;
value->nb_observed_intf++;
// We can also update end time / flags regardless of which interface is used
value->end_mono_time_ts = pkt->current_ts;
value->flags |= pkt->flags;
bpf_spin_unlock(&value->lock);
} else {
increase_counter(OBSERVED_INTF_MISSED);
BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index);
}
}

static __always_inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt,
u64 len, u32 sampling, u32 if_index,
u8 direction) {
// Count only packets seen from the same interface as previously to avoid duplicate counts
if (aggregate_flow->if_index_first_seen == if_index) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
bpf_spin_unlock(&aggregate_flow->lock);
} else if (if_index != 0) {
// Only add info that we've seen this interface
add_observed_intf(aggregate_flow, pkt, if_index, direction);
}
}

static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) {
Expand All @@ -79,23 +107,6 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}
}

static inline void add_observed_intf(flow_metrics *value, u32 if_index, u8 direction) {
if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) {
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i].if_index == if_index &&
value->observed_intf[i].direction == direction) {
return;
}
}
value->observed_intf[value->nb_observed_intf].if_index = if_index;
value->observed_intf[value->nb_observed_intf].direction = direction;
value->nb_observed_intf++;
} else {
increase_counter(OBSERVED_INTF_MISSED);
BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index);
}
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (!has_filter_sampling) {
// When no filter sampling is defined, run the sampling check at the earliest for better performances
Expand Down Expand Up @@ -151,12 +162,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
if (aggregate_flow->if_index_first_seen == skb->ifindex) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else if (skb->ifindex != 0) {
// Only add info that we've seen this interface
add_observed_intf(aggregate_flow, skb->ifindex, direction);
}
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, direction);
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow;
Expand All @@ -183,7 +189,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex,
direction);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down
Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.

0 comments on commit 0921905

Please sign in to comment.