Skip to content

Commit

Permalink
NETOBSERV-1995: Use global map, spinlock, split maps (#469)
Browse files Browse the repository at this point in the history
* NETOBSERV-1987: split maps

Split BPF maps to avoid having one huge map for flows. This should move
us farther from the "stack size limit" that potentially shows up when
adding new features.

This is also a pre-req for later being able to move away from per-cpu
maps, which is wasting memory by duplicating flows across CPUs

Also:
- remove MAC and eth_protocol from map key (they're moved to flow
  metrics ie. map values)
- in every hook, we don't try anymore to create the flow it it didn't
  exist. We assume it should exist. Note that, previously, we were
adding bytes/packet counters from hooks, which doesn't seem correct as
counting bytes/packets is the role of the TC hook only - else it can
lead to overcounts
- remove code that sets MIN_RTT on TCP flows (not sure why we had that)
- small refactoring of the "Accumulate" functions to adapt to new model

Use global map & spinlock

regen .o

Remove packing attributes

Packed attributes make linter unhappy with the spin-lock
Also, turns out it improves quite a lot the performances to remove that:
my tests show another 20% gain

Remove low value test for binary read

TestRecordBinaryEncoding only tests ReadFrom which... basically is a
simple call to binary.Read. So we basically testing the encoding/binary
standard lib.

Avoid unnecessary indirections & copies of MAC

Move DNS to secondary map, revert types.h reorg

Fix DNS issue - retry in case of concurrent write

Also add an error counter for DNS map updates

* Address review comments (renamings)

* Make BpfFlowMetrics a pointer in records

* Reinsert binary encoding test

* Remove unnecessary checks

Checking for zero-values made sense only with per-cpu maps

* update architecture doc

* kernel space mermaid arch

* Update architecture.md
  • Loading branch information
jotak authored Dec 12, 2024
1 parent 2ab577a commit 70003f2
Show file tree
Hide file tree
Showing 44 changed files with 1,409 additions and 972 deletions.
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ linters:
- stylecheck
- typecheck
- unused
run:
go: "1.22"
linters-settings:
stylecheck:
go: "1.22"
gocritic:
enabled-checks:
- hugeParam
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ prereqs: ## Check if prerequisites are met, and install missing dependencies
fmt: ## Run go fmt against code.
@echo "### Formatting code"
go fmt ./...
find ./bpf -type f -not -path "./bpf/headers/*" -name "*.[ch]" | xargs clang-format -i --Werror

.PHONY: lint
lint: prereqs ## Lint the code
Expand Down
92 changes: 66 additions & 26 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,25 @@
*/
#include "network_events_monitoring.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, int dns_errno,
u64 len) {
static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt->current_ts;
}
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->dns_record.id = pkt->dns_id;
aggregate_flow->dns_record.flags = pkt->dns_flags;
aggregate_flow->dns_record.latency = pkt->dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
bpf_spin_unlock(&aggregate_flow->lock);
}

static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) {
if (pkt->dns_id != 0) {
extra_metrics->dns_record.id = pkt->dns_id;
extra_metrics->dns_record.flags = pkt->dns_flags;
extra_metrics->dns_record.latency = pkt->dns_latency;
}
if (dns_errno != 0) {
extra_metrics->dns_record.errno = dns_errno;
}
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
Expand All @@ -76,6 +79,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}
do_sampling = 1;

u16 eth_protocol = 0;

pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

Expand All @@ -90,7 +96,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
struct ethhdr *eth = (struct ethhdr *)data;
u64 len = skb->len;

if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
if (fill_ethhdr(eth, data_end, &pkt, &eth_protocol) == DISCARD) {
return TC_ACT_OK;
}

Expand All @@ -99,7 +105,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
if (skip) {
return TC_ACT_OK;
}
Expand All @@ -108,30 +114,22 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (enable_dns_tracking) {
dns_errno = track_dns_packet(skb, &pkt);
}
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
update_existing_flow(aggregate_flow, &pkt, len);
} else {
// Key does not exist in the map, and will need to create a new entry.
u64 rtt = 0;
if (enable_rtt && id.transport_protocol == IPPROTO_TCP) {
rtt = MIN_RTT;
}
flow_metrics new_flow = {
.packets = 1,
.bytes = len,
.eth_protocol = eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.dscp = pkt.dscp,
.dns_record.id = pkt.dns_id,
.dns_record.flags = pkt.dns_flags,
.dns_record.latency = pkt.dns_latency,
.dns_record.errno = dns_errno,
.flow_rtt = rtt,
};
__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);

long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
Expand All @@ -142,7 +140,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
update_existing_flow(aggregate_flow, &pkt, len);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down Expand Up @@ -171,6 +169,48 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
}
}

// Update additional metrics (per-CPU map)
if (pkt.dns_id != 0 || dns_errno != 0) {
// hack on id will be removed with dedup-in-kernel work
id.direction = 0;
id.if_index = 0;
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
update_dns(extra_metrics, &pkt, dns_errno);
} else {
additional_metrics new_metrics = {
.dns_record.id = pkt.dns_id,
.dns_record.flags = pkt.dns_flags,
.dns_record.latency = pkt.dns_latency,
.dns_record.errno = dns_errno,
};
long ret =
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error adding DNS %d\n", ret);
}
if (ret == -EEXIST) {
// Concurrent write from another CPU; retry
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
update_dns(extra_metrics, &pkt, dns_errno);
} else {
if (trace_messages) {
bpf_printk("failed to update DNS\n");
}
increase_counter(HASHMAP_FAIL_UPDATE_DNS);
}
} else {
increase_counter(HASHMAP_FAIL_UPDATE_DNS);
}
}
}
}

return TC_ACT_OK;
}

Expand Down
13 changes: 7 additions & 6 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
}

static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filter_key_t *key,
u8 *len, u8 *offset, bool use_src_ip) {
u8 *len, u8 *offset, bool use_src_ip,
u16 eth_protocol) {

if (id->eth_protocol == ETH_P_IP) {
if (eth_protocol == ETH_P_IP) {
*len = sizeof(u32);
*offset = sizeof(ip4in6);
if (use_src_ip) {
Expand All @@ -213,7 +214,7 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
__builtin_memcpy(key->ip_data, id->dst_ip + *offset, *len);
}
key->prefix_len = 32;
} else if (id->eth_protocol == ETH_P_IPV6) {
} else if (eth_protocol == ETH_P_IPV6) {
*len = IP_MAX_LEN;
*offset = 0;
if (use_src_ip) {
Expand All @@ -232,7 +233,7 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
u32 drop_reason) {
u32 drop_reason, u16 eth_protocol) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
Expand All @@ -241,7 +242,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
*action = MAX_FILTER_ACTIONS;

// Lets do first CIDR match using srcIP.
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, true);
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, true, eth_protocol);
if (result < 0) {
return result;
}
Expand All @@ -253,7 +254,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
}

// if we can't find a match then Lets do second CIDR match using dstIP.
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, false);
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, false, eth_protocol);
if (result < 0) {
return result;
}
Expand Down
11 changes: 10 additions & 1 deletion bpf/maps_definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,22 @@ struct {

// Key: the flow identifier. Value: the flow metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");

// Key: the flow identifier. Value: extra metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, flow_id);
__type(value, additional_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} additional_flow_metrics SEC(".maps");

//PerfEvent Array for Packet Payloads
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
Expand Down
35 changes: 11 additions & 24 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8

bpf_probe_read(cookie, md_len, user_cookie);

flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
additional_metrics *extra_metrics = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (extra_metrics != NULL) {
u8 idx = extra_metrics->network_events_idx;
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
if (!md_already_exists(extra_metrics->network_events, (u8 *)cookie)) {
__builtin_memcpy(extra_metrics->network_events[idx], cookie, MAX_EVENT_MD);
extra_metrics->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
return 0;
}
Expand All @@ -53,10 +52,9 @@ static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8

static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_metadata *md) {
u8 dscp = 0, protocol = 0, md_len = 0;
u16 family = 0, flags = 0;
u16 family = 0, flags = 0, eth_protocol = 0;
u8 *user_cookie = NULL;
long ret = 0;
u64 len = 0;
flow_id id;

__builtin_memset(&id, 0, sizeof(id));
Expand All @@ -67,12 +65,8 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
return -1;
}

id.if_index = BPF_CORE_READ(md, in_ifindex);

len = BPF_CORE_READ(skb, len);

// read L2 info
core_fill_in_l2(skb, &id, &family);
core_fill_in_l2(skb, &eth_protocol, &family);

// read L3 info
core_fill_in_l3(skb, &id, family, &protocol, &dscp);
Expand All @@ -99,7 +93,7 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
if (skip) {
return 0;
}
Expand All @@ -113,19 +107,12 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// there is no matching flows so lets create new one and add the network event metadata
u64 current_time = bpf_ktime_get_ns();
id.direction = INGRESS;
flow_metrics new_flow = {
.packets = 1,
.bytes = len,
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.flags = flags,
additional_metrics new_flow = {
.network_events_idx = 0,
};
bpf_probe_read(new_flow.network_events[0], md_len, user_cookie);
new_flow.network_events_idx++;
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error network events creating new flow %d\n", ret);
Expand Down
5 changes: 3 additions & 2 deletions bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
__builtin_memset(&pkt, 0, sizeof(pkt));
flow_id id;
__builtin_memset(&id, 0, sizeof(id));
u16 eth_protocol = 0;

pkt.id = &id;

void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;

if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
if (fill_ethhdr(eth, data_end, &pkt, &eth_protocol) == DISCARD) {
return false;
}

Expand All @@ -57,7 +58,7 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
id.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
if (skip) {
return false;
}
Expand Down
Loading

0 comments on commit 70003f2

Please sign in to comment.