Skip to content

Commit

Permalink
refactor: flowtable direction logic and other fixes (#382)
Browse files Browse the repository at this point in the history
Functional change:
- Instead of enabling flowtable timers expiration process only in tests, create flowtable timer node which enabled always. Implemented using process node which triggers interrupts in input node which does timer wheel rotation. For now this is only done as backup mechanism for low-traffic situation and flow timers expected to expire mostly in flow node, so wheel rotation frequency is low. To switch entirely to timer expiration node granularity of time has to be increased to spread timers over time and to not burst them every second.

Changes related to flowtable, but not affecting behavior:
- In flow structure do not use reversed order from flow hashkey. This eliminates tons of xoring of direction with flow reverse mark. This makes code much clearer. Matching in hashtable still done with "ordered" key which order depends on ip addresses. Flow internal ip and port fields swapped if needed on flow creation and flow removal. Access to flow fields can done directly with direction type without additional xoring.
- Separate per direction fields of flow to separated struct which to have better logical separation. There is no advantage from performance side to keep it as SoA instead of AoS, since fields used in random order in hot-path and even if performance difference exists, it's in margin of error.
- In some places replace FT_ORIGIN/FT_REVERS with more meaningful enums for different operations.
- Remove flow event system and replace it with explicit calls to event handlers.
- Separate upf_buffer_opaque_t in own header so we could solve dependency and use it in flowtable code.

Unrelated changes:
- Remove "upf-ipX-proxy-server-no-conn-output" nodes since they make no sense in newer VPP versions (as was mentioned in comments), and also there was a bug where direction variable have been incorrectly reused between packets in one vector.
- Disable and deprecate "pooling" mode for pfcp. UPG supports interrupts without additional configuration if all VPP nodes are in interrupt mode. Except in vpp v22.10 there is a bug which prevents enabling interrupt mode, and it's fixed in vpp 5f30518.
- Fix vpp hang when looping of proxy traffic happens if PDI/FAR configured wrongly by dealing with TTL.
- Simplify optimization possibility of acl matching and do not match spare fields in case of ipv4.
  • Loading branch information
mogaika authored Feb 4, 2024
1 parent a120bd4 commit eef744e
Show file tree
Hide file tree
Showing 25 changed files with 698 additions and 820 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ set(UPF_PLUGIN_HEADER_FILES
upf/upf_app_db.h
upf/flowtable.h
upf/flowtable_tcp.h
upf/upf_buffer_opaque.h
upf/upf_ipfix.h
upf/upf_ipfix_templates.h
upf/upf_gtpu_error.def
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/vpp/vppstartup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ logging {
}
{{- if .InterruptMode }}
upf {
pfcp-server-mode interrupt
session {
use-private-rx-mqs
}
{{- end }}
`
Expand Down
196 changes: 102 additions & 94 deletions upf/flowtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,6 @@
} \
while (0)
#endif
#define FLOWTABLE_PROCESS_WAIT 1

vlib_node_registration_t upf_flow_node;

flow_event_handler_t
flowtable_handlers[FLOW_NUM_EVENTS << FLOWTABLE_MAX_EVENT_HANDLERS_LOG2];

void
flowtable_add_event_handler (flowtable_main_t *fm, flow_event_t event,
flow_event_handler_t handler)
{
ASSERT (event >= 0 && event < FLOW_NUM_EVENTS);
u32 start = event << FLOWTABLE_MAX_EVENT_HANDLERS_LOG2;
u32 end = start + FLOWTABLE_MAX_EVENT_HANDLERS - 1;
u32 index = start;
while (flowtable_handlers[index])
{
index++;
ALWAYS_ASSERT (index <= end);
}
flowtable_handlers[index] = handler;
}

always_inline void
flow_entry_free (flowtable_main_t *fm, flowtable_main_per_cpu_t *fmt,
Expand All @@ -72,28 +50,39 @@ flow_entry_free (flowtable_main_t *fm, flowtable_main_per_cpu_t *fmt,
pool_put (fm->flows, f);
}

always_inline void
int upf_ipfix_flow_remove_handler (flow_entry_t *f, u32 now);
int upf_proxy_flow_remove_handler (flow_entry_t *flow);
int flow_remove_counter_handler (flowtable_main_t *fm, flow_entry_t *flow);
int session_flow_unlink_handler (flowtable_main_t *fm, flow_entry_t *flow);

void
flowtable_entry_remove_internal (flowtable_main_t *fm,
flowtable_main_per_cpu_t *fmt,
flow_entry_t *f, u32 now)
{
clib_bihash_kv_48_8_t kv;

upf_debug ("Flow Remove %d", f - fm->flows);

flowtable_handle_event (fm, f, FLOW_EVENT_REMOVE, FT_ORIGIN, now);
flowtable_handle_event (fm, f, FLOW_EVENT_UNLINK, FT_ORIGIN, now);
upf_ipfix_flow_remove_handler (f, now);
upf_proxy_flow_remove_handler (f);
flow_remove_counter_handler (fm, f);

/* session unlink */
session_flow_unlink_handler (fm, f);

/* timer unlink */
flowtable_timeout_stop_entry (fm, fmt, f);

/* hashtable unlink */
clib_memcpy (kv.key, f->key.key, sizeof (kv.key));
clib_bihash_kv_48_8_t kv = { 0 };
flow_key_t *hash_key = (flow_key_t *) &kv.key;
flow_key_apply_direction (hash_key, &f->key, f->key_direction);
clib_bihash_add_del_48_8 (&fmt->flows_ht, &kv, 0 /* is_add */);

flow_entry_free (fm, fmt, f);
}

int upf_proxy_flow_expire_event_handler (flow_entry_t *flow);

/* return true if flow was removed */
always_inline bool
try_expire_single_flow (flowtable_main_t *fm, flowtable_main_per_cpu_t *fmt,
Expand All @@ -108,8 +97,7 @@ try_expire_single_flow (flowtable_main_t *fm, flowtable_main_per_cpu_t *fmt,
(f->active + f->lifetime) % FLOW_TIMER_MAX_LIFETIME, now,
fmt->time_index);

if (!keep &&
flowtable_handle_event (fm, f, FLOW_EVENT_EXPIRE, FT_ORIGIN, now) != 0)
if (!keep && upf_proxy_flow_expire_event_handler (f))
{
/* flow still in use, wait for another lifetime */
upf_debug ("Flow %d: expiration blocked by the event handler",
Expand Down Expand Up @@ -141,8 +129,8 @@ flowtable_timer_expire (flowtable_main_t *fm, flowtable_main_per_cpu_t *fmt,
u64 expire_cpt = 0;

/*
* Must call flowtable_timer_expire() only after timer_wheel_index_update()
* with the same 'now' value
* Must call flowtable_timer_expire() only after
* flowtable_timer_wheel_index_update() with the same 'now' value
*/
ASSERT (now % FLOW_TIMER_MAX_LIFETIME == fmt->time_index);

Expand Down Expand Up @@ -218,9 +206,8 @@ flowtable_lifetime_calculate (flowtable_main_t *fm, flow_key_t const *key)
return fm->timer_lifetime[FT_TIMEOUT_TYPE_TCP];

default:
return ip46_address_is_ip4 (&key->ip[FT_ORIGIN]) ?
fm->timer_lifetime[FT_TIMEOUT_TYPE_IPV4] :
fm->timer_lifetime[FT_TIMEOUT_TYPE_IPV6];
return key->is_ip4 ? fm->timer_lifetime[FT_TIMEOUT_TYPE_IPV4] :
fm->timer_lifetime[FT_TIMEOUT_TYPE_IPV6];
}

return fm->timer_lifetime[FT_TIMEOUT_TYPE_UNKNOWN];
Expand All @@ -233,13 +220,28 @@ flowtable_entry_remove (flowtable_main_t *fm, flow_entry_t *f, u32 now)
flowtable_entry_remove_internal (fm, fmt, f, now);
}

always_inline void
flowtable_entry_init_side (flow_side_t *side, u32 now)
{
side->pdr_id = ~0;
// TODO: check if it better to 0 teid instead, since ~0 is
// valid teid, but not 0
side->teid = ~0;
side->next = FT_NEXT_CLASSIFY;
side->ipfix.last_exported = now;
side->ipfix.info_index = ~0;
side->tcp.conn_index = ~0;
side->tcp.thread_index = ~0;
}

/* TODO: replace with a more appropriate hashtable */
u32
flowtable_entry_lookup_create (flowtable_main_t *fm,
flowtable_main_per_cpu_t *fmt,
clib_bihash_kv_48_8_t *kv, u64 timestamp_ns,
u32 const now, u8 is_reverse, u16 generation,
u32 session_index, int *created)
u32 const now,
flow_direction_op_t key_direction,
u16 generation, u32 session_index, int *created)
{
flow_entry_t *f;
upf_main_t *gtm = &upf_main;
Expand All @@ -259,38 +261,24 @@ flowtable_entry_lookup_create (flowtable_main_t *fm,

pool_get_zero (fm->flows, f);

clib_memcpy (f->key.key, kv->key, sizeof (f->key.key));
f->is_reverse = is_reverse;
flow_key_apply_direction (&f->key, (flow_key_t *) kv->key, key_direction);
f->key_direction = key_direction;

f->lifetime = flowtable_lifetime_calculate (fm, &f->key);
f->active = now;
f->flow_start_time = timestamp_ns;
f->flow_end_time = timestamp_ns;
f->application_id = ~0;
flow_ipfix_info (f, FT_ORIGIN) = ~0;
flow_ipfix_info (f, FT_REVERSE) = ~0;
f->cpu_index = os_get_thread_index ();
f->generation = generation;
flow_pdr_id (f, FT_ORIGIN) = ~0;
flow_pdr_id (f, FT_REVERSE) = ~0;
flow_teid (f, FT_ORIGIN) = ~0;
flow_teid (f, FT_REVERSE) = ~0;
flow_next (f, FT_ORIGIN) = FT_NEXT_CLASSIFY;
flow_next (f, FT_REVERSE) = FT_NEXT_CLASSIFY;
flow_tc (f, FT_ORIGIN).conn_index = ~0;
flow_tc (f, FT_ORIGIN).thread_index = ~0;
flow_tc (f, FT_REVERSE).conn_index = ~0;
flow_tc (f, FT_REVERSE).thread_index = ~0;
/*
* IPFIX export shouldn't happen immediately.
* Need to wait for the first interval to pass
*/
flow_last_exported (f, FT_ORIGIN) = now;
flow_last_exported (f, FT_REVERSE) = now;
f->ps_index = ~0;
f->timer_slot = ~0;

flowtable_entry_init_side (flow_side (f, FT_ORIGIN), now);
flowtable_entry_init_side (flow_side (f, FT_REVERSE), now);

session_flows_list_anchor_init (f);
flow_timeout_list_anchor_init (f);
f->timer_slot = ~0;

/* insert in timer list */
flowtable_timeout_start_entry (fm, fmt, f, now);
Expand All @@ -310,8 +298,8 @@ flowtable_entry_lookup_create (flowtable_main_t *fm,
}

void
timer_wheel_index_update (flowtable_main_t *fm, flowtable_main_per_cpu_t *fmt,
u32 now)
flowtable_timer_wheel_index_update (flowtable_main_t *fm,
flowtable_main_per_cpu_t *fmt, u32 now)
{
fmt->time_index = now % FLOW_TIMER_MAX_LIFETIME;
}
Expand All @@ -322,17 +310,16 @@ format_flow_key (u8 *s, va_list *args)
flow_key_t *key = va_arg (*args, flow_key_t *);

return format (s, "proto 0x%x, %U:%u <-> %U:%u, seid 0x%016llx", key->proto,
format_ip46_address, &key->ip[FT_ORIGIN], IP46_TYPE_ANY,
clib_net_to_host_u16 (key->port[FT_ORIGIN]),
format_ip46_address, &key->ip[FT_REVERSE], IP46_TYPE_ANY,
clib_net_to_host_u16 (key->port[FT_REVERSE]), key->up_seid);
format_ip46_address, &key->ip[FTK_EL_SRC], IP46_TYPE_ANY,
clib_net_to_host_u16 (key->port[FTK_EL_SRC]),
format_ip46_address, &key->ip[FTK_EL_DST], IP46_TYPE_ANY,
clib_net_to_host_u16 (key->port[FTK_EL_DST]), key->up_seid);
}

u8 *
format_flow (u8 *s, va_list *args)
{
flow_entry_t *flow = va_arg (*args, flow_entry_t *);
int is_reverse = flow->is_reverse;
upf_main_t *sm = &upf_main;
#if CLIB_DEBUG > 0
flowtable_main_t *fm = &flowtable_main;
Expand All @@ -354,11 +341,12 @@ format_flow (u8 *s, va_list *args)
"%U, UL pkt %u, DL pkt %u, "
"Forward PDR %u, Reverse PDR %u, "
"app %v, lifetime %u, proxy %d, spliced %d nat port %d",
format_flow_key, &flow->key, flow->stats[is_reverse].pkts,
flow->stats[is_reverse ^ FT_REVERSE].pkts,
flow_pdr_id (flow, FT_ORIGIN), flow_pdr_id (flow, FT_REVERSE),
app_name, flow->lifetime, flow->is_l3_proxy, flow->is_spliced,
flow->nat_sport);
format_flow_key, &flow->key,
flow_side (flow, FT_ORIGIN)->stats.pkts,
flow_side (flow, FT_REVERSE)->stats.pkts,
flow_side (flow, FT_ORIGIN)->pdr_id,
flow_side (flow, FT_REVERSE)->pdr_id, app_name, flow->lifetime,
flow->is_l3_proxy, flow->is_spliced, flow->nat_sport);
#if CLIB_DEBUG > 0
s = format (s, ", dont_splice %d", flow->dont_splice);
#endif
Expand Down Expand Up @@ -522,39 +510,59 @@ VLIB_CLI_COMMAND (upf_show_flow_timeout_command, static) =
};
/* clang-format on */

static uword
flowtable_process (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
VLIB_NODE_FN (upf_flowtable_timer_wk_process_node)
(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
{
flowtable_main_t *fm = &flowtable_main;
u32 thread_index = os_get_thread_index ();
flowtable_main_per_cpu_t *fmt = &fm->per_cpu[thread_index];
u32 current_time = (u32) vlib_time_now (vm);

flowtable_timer_wheel_index_update (fm, fmt, current_time);
u64 num_expired = flowtable_timer_expire (fm, fmt, current_time);

vlib_node_increment_counter (
vm, node->node_index, FLOWTABLE_TIMER_ERROR_TIMER_EXPIRE, num_expired);

return 0;
}

static char *flowtable_timer_input_error_strings[] = {
#define _(sym, string) string,
foreach_upf_flowtable_timer_error
#undef _
};

VLIB_REGISTER_NODE (upf_flowtable_timer_wk_process_node) = {
.name = "upf-flowtable-wk-timer",
.type = VLIB_NODE_TYPE_INPUT,
.state = VLIB_NODE_STATE_INTERRUPT,
.error_strings = flowtable_timer_input_error_strings,
.n_errors = FLOWTABLE_TIMER_N_ERROR,
};

static uword
upf_flowtable_timer_process (vlib_main_t *vm, vlib_node_runtime_t *rt,
vlib_frame_t *f)
{

while (1)
{
u32 num_expired;
u32 current_time = (u32) vlib_time_now (vm);
// TODO: support multiple cores here
// (although this is only needed for debugging)
u32 cpu_index = os_get_thread_index ();
flowtable_main_per_cpu_t *fmt = &fm->per_cpu[cpu_index];
(void) vlib_process_wait_for_event_or_clock (vm, FLOWTABLE_PROCESS_WAIT);
vlib_worker_thread_barrier_sync (vm);
timer_wheel_index_update (fm, fmt, current_time);
num_expired = flowtable_timer_expire (fm, fmt, current_time);
if (num_expired > 0)
upf_debug ("expired %d flows", num_expired);
vlib_node_increment_counter (vm, rt->node_index,
FLOWTABLE_ERROR_TIMER_EXPIRE, num_expired);
vlib_worker_thread_barrier_release (vm);
(void) vlib_process_wait_for_event_or_clock (
vm, 1.0 / FLOWTABLE_TIMER_FREQUENCY);

// send interrupts
for (int ti = 0; ti < vlib_get_n_threads (); ti++)
vlib_node_set_interrupt_pending (
vlib_get_main_by_index (ti),
upf_flowtable_timer_wk_process_node.index);
}

return 0;
}

/* clang-format off */
VLIB_REGISTER_NODE (flowtable_process_node) = {
.function = flowtable_process,
VLIB_REGISTER_NODE (upf_flowtable_timer_process_node) = {
.name = "upf-flowtable-timer",
.function = upf_flowtable_timer_process,
.type = VLIB_NODE_TYPE_PROCESS,
.process_log2_n_stack_bytes = 16,
.runtime_data_bytes = sizeof (void *),
.name = "upf-flowtable",
.state = VLIB_NODE_STATE_DISABLED,
};
Loading

0 comments on commit eef744e

Please sign in to comment.