Skip to content

Commit

Permalink
- Minor restructuring of application-side cache implementation.
Browse files Browse the repository at this point in the history
- Enhanced it for efficiency to always perform bulk (de)allocations.
- Added support for retries in case of failed free operations to the
  global pool.
  • Loading branch information
marinosi committed Nov 8, 2023
1 parent c80277c commit 2341d07
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 90 deletions.
5 changes: 3 additions & 2 deletions src/core/drivers/shm/channel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,9 @@ TEST(ChannelFullDuplex, SendRecvMsg) {
// Allocate from ctx->cache
uint32_t current_buffers_cnt;
auto ctx = channel->ctx();
while (ctx->cached_bufs.count > 0)
buffers.push_back(ctx->cached_bufs.indices[--ctx->cached_bufs.count]);
while (ctx->app_buffer_cache.count > 0)
buffers.push_back(
ctx->app_buffer_cache.indices[--ctx->app_buffer_cache.count]);
current_buffers_cnt = buffers.size();
// Allocate from shm::channel->cache
current_buffers_cnt += channel->GetAllCachedBufferIndices(&buffers);
Expand Down
187 changes: 111 additions & 76 deletions src/ext/machnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,52 +126,116 @@ static int _machnet_ctrl_request(machnet_ctrl_msg_t *req,

return 0;
}

/**
* @brief Helper function to free buffer indices to the cache stored on
* stack side.
* @param ctx Pointer to the Machnet Channel context.
* @param cnt Number of buffer indices to free
* @param buffer_indices Pointer to the array of MachnetRingSlot_t entries to
* free.
* @return Number of indices freed to cache; should be equal to cnt
* @brief Allocates a specified number of buffers for use, either directly from
* the global pool or from the application's buffer cache.
*
* This function allocates `cnt` number of buffers for the Machnet channel. If
* the count exceeds the number of cached buffers (`NUM_CACHED_BUFS`), the
* allocation is made directly from the global pool. For smaller allocations, it
* tries to fulfill the request from the application's buffer cache. If the
* cache is empty, it refills the cache from the global pool. If allocation from
* the global pool fails at any point, the function returns `NULL`.
*
* @param ctx Pointer to the MachnetChannelCtx_t structure that holds channel
* context information, including the application buffer cache.
* @param cnt The number of buffers to allocate.
* @return A pointer to the first MachnetRingSlot_t element of an array
* containing the allocated buffer indices if the allocation is successful;
* otherwise, `NULL`.
*/
static inline __attribute__((always_inline)) uint32_t
_machnet_buf_free_to_cache(MachnetChannelCtx_t *ctx, uint32_t cnt,
const MachnetRingSlot_t *buffer_indices) {
uint32_t ret;
uint32_t idx = ctx->cached_bufs.count;
// If cache is empty, fill the cache directly
for (ret = 0; ret < cnt; ret++) {
ctx->cached_bufs.indices[idx + ret] = buffer_indices[ret];
ctx->cached_bufs.count++;
}
// Ensure that we processed all buffer indices
assert(ret == cnt);
return ret;
static inline MachnetRingSlot_t *_machnet_buffers_alloc(
MachnetChannelCtx_t *ctx, uint32_t cnt) {
MachnetRingSlot_t *buffer_indices = __machnet_channel_buffer_index_table(ctx);

if (cnt > NUM_CACHED_BUFS) {
// This is a large bulk allocation, so we can bypass the application cache.
uint32_t ret =
__machnet_channel_buf_alloc_bulk(ctx, cnt, buffer_indices, NULL);
if (ret != cnt) {
return NULL;
}
return buffer_indices;
}

// Try to allocate from the application cache.
uint32_t index = 0;
while (index < cnt) {
if (ctx->app_buffer_cache.count == 0) {
// The cache is empty, so we need to allocate from the global pool.
ctx->app_buffer_cache.count += __machnet_channel_buf_alloc_bulk(
ctx, NUM_CACHED_BUFS, ctx->app_buffer_cache.indices, NULL);
if (ctx->app_buffer_cache.count == 0) {
// We failed to allocate from the global pool.
goto fail;
}
}

buffer_indices[index++] =
ctx->app_buffer_cache.indices[--ctx->app_buffer_cache.count];
}

return buffer_indices;

fail:
// Bulk allocation has failed; return partial allocation to the application
// cache.
for (uint32_t i = 0; i < index; i++) {
ctx->app_buffer_cache.indices[ctx->app_buffer_cache.count++] =
buffer_indices[i];
}

return NULL;
}

/**
* @brief Helper function to free used buffer indices; It first tries to free
* to cache stored on machnet stack side, then frees the rest back to global
* buffer ring
* @param ctx Pointer to the Machnet Channel context.
* @param cnt Number of buffer indices to free
* @param buffer_indices Pointer to the array of MachnetRingSlot_t entries to
* free
* @return Number of freed indices
* @brief Releases a given number of buffers by either caching them or freeing
* them to the global pool.
*
* This function attempts to release a specified count of buffers back into the
* Machnet channel context's application buffer cache. If the cache is full,
* it will free half of the cached buffers to the global buffer pool. If after
* several retries it is unable to free buffers to the global pool, the function
* aborts the program execution.
*
* @param ctx Pointer to the MachnetChannelCtx_t structure that represents the
* channel context which holds the application buffer cache.
* @param cnt The number of buffers to be released.
* @param buffer_indices Array of MachnetRingSlot_t that contains the indices of
* the buffers that need to be released.
*
* @warning If the function fails to free the buffers to the global pool after a
* certain number of retries, it will output an error message to stderr
* and call abort() to terminate program execution.
*/
static inline __attribute__((always_inline)) uint32_t _machnet_buf_free(
MachnetChannelCtx_t *ctx, uint32_t cnt, MachnetRingSlot_t *buffer_indices) {
uint32_t available_capacity, to_free, ret;
available_capacity = NUM_CACHED_BUFS - ctx->cached_bufs.count;
to_free = MIN(cnt, available_capacity);
ret =
(to_free) ? _machnet_buf_free_to_cache(ctx, to_free, buffer_indices) : 0;
if (cnt > available_capacity) {
ret += __machnet_channel_buf_free_bulk(ctx, cnt - available_capacity,
buffer_indices + available_capacity);
}
return ret;
static inline void _machnet_buffers_release(MachnetChannelCtx_t *ctx,
uint32_t cnt,
MachnetRingSlot_t *buffer_indices) {
uint32_t index = 0;
while (index < cnt) {
uint32_t retries = 5;
while (ctx->app_buffer_cache.count == NUM_CACHED_BUFS) {
// The cache is full, free to global pool.
uint32_t elements_to_free = ctx->app_buffer_cache.count / 2;
MachnetRingSlot_t *indices_to_free =
ctx->app_buffer_cache.indices + (NUM_CACHED_BUFS - elements_to_free);
ctx->app_buffer_cache.count -= __machnet_channel_buf_free_bulk(
ctx, elements_to_free, indices_to_free);

if (retries-- == 0 && ctx->app_buffer_cache.count == NUM_CACHED_BUFS) {
/*
* XXX (ilias): If we reach here, we have failed to free the buffers to
* the global pool and we are going to leak them. Terminate execution.
*/
fprintf(stderr, "ERROR: Failed to free buffers to global pool.\n");
abort();
}
}

ctx->app_buffer_cache.indices[ctx->app_buffer_cache.count++] =
buffer_indices[index++];
}
}

int machnet_init() {
Expand Down Expand Up @@ -483,35 +547,10 @@ int machnet_sendmsg(const void *channel_ctx, const MachnetMsgHdr_t *msghdr) {
// them.
const uint32_t buffers_nr =
(msghdr->msg_size + kMsgBufPayloadMax - 1) / kMsgBufPayloadMax;
MachnetRingSlot_t *buf_index_table =
__machnet_channel_buffer_index_table(ctx);
// if buffer cache empty fill it
if (ctx->cached_bufs.count == 0) {
if (__machnet_channel_buf_alloc_bulk(ctx, NUM_CACHED_BUFS,
ctx->cached_bufs.indices,
NULL) == NUM_CACHED_BUFS) {
ctx->cached_bufs.count = NUM_CACHED_BUFS;
}
}

if (buffers_nr <= ctx->cached_bufs.count) {
// get all buffers from cache
for (uint32_t i = 0; i < buffers_nr; i++) {
buf_index_table[i] = ctx->cached_bufs.indices[ctx->cached_bufs.count - 1];
ctx->cached_bufs.count--;
}
} else {
uint32_t remaining = buffers_nr - ctx->cached_bufs.count;
// allocate directly from ring
if (__machnet_channel_buf_alloc_bulk(ctx, remaining, buf_index_table,
NULL) != remaining) {
return -1;
}
// get the rest from cache
for (uint32_t i = remaining; i < buffers_nr; i++) {
buf_index_table[i] = ctx->cached_bufs.indices[ctx->cached_bufs.count - 1];
ctx->cached_bufs.count--;
}
MachnetRingSlot_t *buf_index_table = _machnet_buffers_alloc(ctx, buffers_nr);
if (buf_index_table == NULL) {
// We failed to allocate the buffers.
return -1;
}

// Gather all message segments.
Expand Down Expand Up @@ -620,7 +659,6 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) {
MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)channel_ctx;

const uint32_t kBufferBatchSize = 16;
uint32_t ret __attribute__((unused));

// Deque a message from the ring.
MachnetRingSlot_t buffer_index;
Expand Down Expand Up @@ -691,8 +729,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) {
// Do a batch buffer release if we reached the threshold.
if (buffer_indices_index == kBufferBatchSize) {
// release to the buf_ring
ret = _machnet_buf_free(ctx, buffer_indices_index, buffer_indices);
assert(ret == buffer_indices_index);
_machnet_buffers_release(ctx, buffer_indices_index, buffer_indices);
buffer_indices_index = 0;
}
}
Expand All @@ -709,8 +746,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) {
msghdr->flow_info = flow_info;

// Free up any remaining buffers.
ret = _machnet_buf_free(ctx, buffer_indices_index, buffer_indices);
assert(ret == buffer_indices_index);
_machnet_buffers_release(ctx, buffer_indices_index, buffer_indices);

// Success.
return 1;
Expand All @@ -725,8 +761,7 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) {
buffer = NULL;
}
if (buffer == NULL || buffer_indices_index == kBufferBatchSize) {
ret = _machnet_buf_free(ctx, buffer_indices_index, buffer_indices);
assert(ret == buffer_indices_index);
_machnet_buffers_release(ctx, buffer_indices_index, buffer_indices);
buffer_indices_index = 0;
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/ext/machnet_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,16 @@ struct MachnetChannelCtrlCtx {
typedef struct MachnetChannelCtrlCtx MachnetChannelCtrlCtx_t;

/*
* The `CachedBufs` helps to manage the cached buffer indices array which is
* NUM_CACHED_BUFS long.:i
* This data structure is used to implement a small cache of buffer indices
* used exclusively by the application. This reduces contention on the global
* buffer pool (which uses atomic operations for MP-safety).
*/
struct CachedBufs {
struct MachnetChannelAppBufferCache {
uint32_t count;
MachnetRingSlot_t indices[NUM_CACHED_BUFS];
};
typedef struct CachedBufs CachedBufs_t;
typedef struct MachnetChannelAppBufferCache MachnetChannelAppBufferCache_t;

/**
* The `MachnetChannelCtx' holds all the metadata information (context) of an
* Machnet Channel.
Expand All @@ -139,7 +141,7 @@ struct MachnetChannelCtx {
char name[MACHNET_CHANNEL_NAME_MAX_LEN];
MachnetChannelCtrlCtx_t ctrl_ctx; // Control channel's specific metadata.
MachnetChannelDataCtx_t data_ctx; // Dataplane channel's specific metadata.
CachedBufs_t cached_bufs;
MachnetChannelAppBufferCache_t app_buffer_cache;
} __attribute__((aligned(CACHE_LINE_SIZE)));
typedef struct MachnetChannelCtx MachnetChannelCtx_t;

Expand Down Expand Up @@ -564,7 +566,7 @@ __machnet_channel_buffers_avail(const MachnetChannelCtx_t *ctx) {
assert(ctx != NULL);

jring_t *buf_ring = __machnet_channel_buf_ring(ctx);
return ctx->cached_bufs.count + jring_count(buf_ring);
return ctx->app_buffer_cache.count + jring_count(buf_ring);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/ext/machnet_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static inline int __machnet_channel_dataplane_init(
ctx->ctrl_ctx.req_id = 0;

// Initialize buffer cache
ctx->cached_bufs.count = 0;
ctx->app_buffer_cache.count = 0;

// Clear out statatistics.
ctx->data_ctx.stats_ofs = sizeof(*ctx);
Expand Down
10 changes: 5 additions & 5 deletions src/ext/machnet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ uint32_t bounce_machnet_to_app(const MachnetChannelCtx_t *ctx) {
// a valid state.
bool check_buffer_pool(const MachnetChannelCtx_t *ctx) {
// Release cached buffers to the pool
if (__machnet_channel_buf_free_bulk(ctx, ctx->cached_bufs.count,
ctx->cached_bufs.indices) !=
ctx->cached_bufs.count) {
if (__machnet_channel_buf_free_bulk(ctx, ctx->app_buffer_cache.count,
ctx->app_buffer_cache.indices) !=
ctx->app_buffer_cache.count) {
return false;
}
*__DECONST(uint32_t *, &ctx->cached_bufs.count) = 0;
*__DECONST(uint32_t *, &ctx->app_buffer_cache.count) = 0;

jring_t *buf_ring = __machnet_channel_buf_ring(ctx);
auto nbuffers = buf_ring->capacity;
Expand Down Expand Up @@ -314,7 +314,7 @@ TEST(MachnetTest, MultiBufferSendRecvMsg) {
EXPECT_EQ(ret, 1) << "Msg size: " << msg_size;
EXPECT_EQ(rx_msg_data, tx_msg_data) << "Msg size: " << msg_size;
EXPECT_EQ(memcmp(&rx_msghdr.flow_info, &flow, sizeof(flow)), 0);
EXPECT_TRUE(check_buffer_pool(g_channel_ctx));
EXPECT_TRUE(check_buffer_pool(g_channel_ctx)) << "Msg size: " << msg_size;
EXPECT_EQ(jring_full(__machnet_channel_buf_ring(g_channel_ctx)), 1)
<< "Available buffers: "
<< jring_count(__machnet_channel_buf_ring(g_channel_ctx));
Expand Down

0 comments on commit 2341d07

Please sign in to comment.