diff --git a/bpf/flows.c b/bpf/flows.c index b3af20b1..c9b80f49 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -6,13 +6,10 @@ to/from an interface. Logic: - 1) Store flow information in a per-cpu hash map. - 2) Upon flow completion (tcp->fin event), evict the entry from map, and - send to userspace through ringbuffer. - Eviction for non-tcp flows need to done by userspace - 3) When the map is full, we send the new flow entry to userspace via ringbuffer, + 1) Store flow information in a hash map. + 2) Periodically evict the entry from map from userspace. + 3) When the map is full/busy, we send the new flow entry to userspace via ringbuffer, until an entry is available. - 4) When hash collision is detected, we send the new entry to userpace via ringbuffer. */ #include #include @@ -55,16 +52,47 @@ */ #include "pkt_translation.h" -static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len, - u32 sampling) { - bpf_spin_lock(&aggregate_flow->lock); - aggregate_flow->packets += 1; - aggregate_flow->bytes += len; - aggregate_flow->end_mono_time_ts = pkt->current_ts; - aggregate_flow->flags |= pkt->flags; - aggregate_flow->dscp = pkt->dscp; - aggregate_flow->sampling = sampling; - bpf_spin_unlock(&aggregate_flow->lock); +static inline void add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index, + u8 direction) { + u8 nb_observed_intf = value->nb_observed_intf; + if (nb_observed_intf < MAX_OBSERVED_INTERFACES) { + for (u8 i = 0; i < nb_observed_intf; i++) { + if (value->observed_intf[i].if_index == if_index && + value->observed_intf[i].direction == direction) { + return; + } + } + bpf_spin_lock(&value->lock); + value->observed_intf[nb_observed_intf].if_index = if_index; + value->observed_intf[nb_observed_intf].direction = direction; + value->nb_observed_intf++; + // We can also update end time / flags regardless of which interface is used + value->end_mono_time_ts = pkt->current_ts; + value->flags |= pkt->flags; + bpf_spin_unlock(&value->lock); + } else { + increase_counter(OBSERVED_INTF_MISSED); + BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index); + } +} + +static __always_inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, + u64 len, u32 sampling, u32 if_index, + u8 direction) { + // Count only packets seen from the same interface as previously to avoid duplicate counts + if (aggregate_flow->if_index_first_seen == if_index) { + bpf_spin_lock(&aggregate_flow->lock); + aggregate_flow->packets += 1; + aggregate_flow->bytes += len; + aggregate_flow->end_mono_time_ts = pkt->current_ts; + aggregate_flow->flags |= pkt->flags; + aggregate_flow->dscp = pkt->dscp; + aggregate_flow->sampling = sampling; + bpf_spin_unlock(&aggregate_flow->lock); + } else if (if_index != 0) { + // Only add info that we've seen this interface + add_observed_intf(aggregate_flow, pkt, if_index, direction); + } } static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) { @@ -79,23 +107,6 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, } } -static inline void add_observed_intf(flow_metrics *value, u32 if_index, u8 direction) { - if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) { - for (u8 i = 0; i < value->nb_observed_intf; i++) { - if (value->observed_intf[i].if_index == if_index && - value->observed_intf[i].direction == direction) { - return; - } - } - value->observed_intf[value->nb_observed_intf].if_index = if_index; - value->observed_intf[value->nb_observed_intf].direction = direction; - value->nb_observed_intf++; - } else { - increase_counter(OBSERVED_INTF_MISSED); - BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index); - } -} - static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { if (!has_filter_sampling) { // When no filter sampling is defined, run the sampling check at the earliest for better performances @@ -151,12 +162,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { } flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); if (aggregate_flow != NULL) { - if (aggregate_flow->if_index_first_seen == skb->ifindex) { - update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); - } else if (skb->ifindex != 0) { - // Only add info that we've seen this interface - add_observed_intf(aggregate_flow, skb->ifindex, direction); - } + update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, direction); } else { // Key does not exist in the map, and will need to create a new entry. flow_metrics new_flow; @@ -183,7 +189,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); if (aggregate_flow != NULL) { - update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); + update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, + direction); } else { if (trace_messages) { bpf_printk("failed to update an exising flow\n"); diff --git a/pkg/ebpf/bpf_arm64_bpfel.o b/pkg/ebpf/bpf_arm64_bpfel.o index 62b8c401..91ab0593 100644 Binary files a/pkg/ebpf/bpf_arm64_bpfel.o and b/pkg/ebpf/bpf_arm64_bpfel.o differ diff --git a/pkg/ebpf/bpf_powerpc_bpfel.o b/pkg/ebpf/bpf_powerpc_bpfel.o index fa7440eb..c37133c1 100644 Binary files a/pkg/ebpf/bpf_powerpc_bpfel.o and b/pkg/ebpf/bpf_powerpc_bpfel.o differ diff --git a/pkg/ebpf/bpf_s390_bpfeb.o b/pkg/ebpf/bpf_s390_bpfeb.o index 9030d83c..e4d1e95b 100644 Binary files a/pkg/ebpf/bpf_s390_bpfeb.o and b/pkg/ebpf/bpf_s390_bpfeb.o differ diff --git a/pkg/ebpf/bpf_x86_bpfel.o b/pkg/ebpf/bpf_x86_bpfel.o index 63bbed34..45214a67 100644 Binary files a/pkg/ebpf/bpf_x86_bpfel.o and b/pkg/ebpf/bpf_x86_bpfel.o differ