Skip to content

Commit

Permalink
TL/UCP: complete memory map extensions
Browse files Browse the repository at this point in the history
API: switch memory type DPU to EXPORTED
  • Loading branch information
ferrol aderholdt committed Feb 8, 2024
1 parent f29545c commit 5c25b9c
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 55 deletions.
3 changes: 2 additions & 1 deletion src/components/tl/ucp/alltoall/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_status_t status;

ALLTOALL_TASK_CHECK(coll_args->args, tl_team);

#if 1
if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER)) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"global work buffer not provided nor associated with team");
Expand All @@ -95,6 +95,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
goto out;
}
}
#endif
task = ucc_tl_ucp_init_task(coll_args, team);
*task_h = &task->super;
task->super.post = ucc_tl_ucp_alltoall_onesided_start;
Expand Down
12 changes: 8 additions & 4 deletions src/components/tl/ucp/alltoall/alltoall_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "alltoall.h"
#include "core/ucc_progress_queue.h"
#include "utils/ucc_math.h"
#include "tl_ucp_coll.h"
#include "tl_ucp_sendrecv.h"

void ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask);
Expand All @@ -24,25 +25,28 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask)
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);
ucc_rank_t start = (grank + 1) % gsize;
long * pSync = TASK_ARGS(task).global_work_buffer;
ucc_memory_type_t mtype;
ucc_rank_t peer;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

ucc_tl_ucp_coll_dynamic_segments(&TASK_ARGS(task), task);
/* TODO: change when support for library-based work buffers is complete */
nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype);
dest = dest + grank * nelems;
mtype = (TASK_ARGS(task).src.info.mem_type == UCC_MEMORY_TYPE_EXPORTED) ? UCC_MEMORY_TYPE_HOST : TASK_ARGS(task).src.info.mem_type;

UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + start * nelems),
(void *)dest, nelems, start, team, task),
(void *)dest, nelems, start, mtype, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, start, team), task, out);

for (peer = (start + 1) % gsize; peer != start; peer = (peer + 1) % gsize) {
UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + peer * nelems),
(void *)dest, nelems, peer, team, task),
(void *)dest, nelems, peer, mtype, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task,
out);
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
Expand Down
4 changes: 3 additions & 1 deletion src/components/tl/ucp/alltoallv/alltoallv_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
ucc_aint_t *d_disp = TASK_ARGS(task).dst.info_v.displacements;
size_t sdt_size = ucc_dt_size(TASK_ARGS(task).src.info_v.datatype);
size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype);
ucc_memory_type_t mtype;
ucc_rank_t peer;
size_t sd_disp, dd_disp, data_size;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

mtype = (TASK_ARGS(task).src.info_v.mem_type == UCC_MEMORY_TYPE_EXPORTED) ? UCC_MEMORY_TYPE_HOST : TASK_ARGS(task).src.info_v.mem_type;
/* perform a put to each member peer using the peer's index in the
* destination displacement. */
for (peer = (grank + 1) % gsize; task->onesided.put_posted < gsize;
Expand All @@ -46,7 +48,7 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)

UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp),
PTR_OFFSET(dest, dd_disp),
data_size, peer, team, task),
data_size, peer, mtype, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out);
}
Expand Down
80 changes: 77 additions & 3 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#define ONESIDED_SYNC_SIZE 1
#define ONESIDED_REDUCE_SIZE 4


typedef struct ucc_tl_ucp_iface {
ucc_tl_iface_t super;
} ucc_tl_ucp_iface_t;
Expand Down Expand Up @@ -96,10 +97,33 @@ typedef struct ucc_tl_ucp_lib {
UCC_CLASS_DECLARE(ucc_tl_ucp_lib_t, const ucc_base_lib_params_t *,
const ucc_base_config_t *);

/* dynamic segments stored in a flat buffer. An example with 4 segments on
* two PEs, with segments stored two at a time (collective with src/dst pair):
rva/key => (rva, len, key size, key) tuple
+-----------------------------+-----------------------------+
| seg group 0 (seg 0 + seg 1) | seg group 1 (seg 2 + seg 3) |
+--------------+--------------+--------------+--------------+
| rva/key pe 0 | rva/key pe 1 | rva/key pe 0 | rva/key pe 1 |
+--------------+--------------+--------------+--------------+
*/
typedef struct ucc_tl_ucp_dynamic_seg {
void *dyn_buff; /* flat buffer with rva, keys, etc. */
size_t buff_size;
size_t *seg_groups; /* segment to segment group mapping */
size_t *seg_group_start;/* offset of dyn_buff to start of seg group */
size_t *seg_group_size; /* storage size of a seg group */
size_t *starting_seg; /* starting seg for a seg group */
size_t *num_seg_per_group;
size_t num_groups;
} ucc_tl_ucp_dynamic_seg_t;

typedef struct ucc_tl_ucp_remote_info {
void * va_base;
size_t len;
void * mem_h;
void * packed_memh;
void * packed_key;
size_t packed_key_len;
} ucc_tl_ucp_remote_info_t;
Expand All @@ -120,9 +144,13 @@ typedef struct ucc_tl_ucp_context {
ucc_tl_ucp_worker_t service_worker;
uint32_t service_worker_throttling_count;
ucc_mpool_t req_mp;
ucc_tl_ucp_remote_info_t * remote_info;
ucc_tl_ucp_remote_info_t *remote_info;
ucc_tl_ucp_remote_info_t *dynamic_remote_info;
ucc_tl_ucp_dynamic_seg_t dyn_seg;
ucp_rkey_h * rkeys;
uint64_t n_rinfo_segs;
uint64_t n_dynrinfo_segs;
uint64_t max_segs;
uint64_t ucp_memory_types;
int topo_required;
} ucc_tl_ucp_context_t;
Expand All @@ -135,8 +163,6 @@ typedef struct ucc_tl_ucp_team {
ucc_status_t status;
uint32_t seq_num;
ucc_tl_ucp_task_t *preconnect_task;
void * va_base[MAX_NR_SEGMENTS];
size_t base_length[MAX_NR_SEGMENTS];
ucc_tl_ucp_worker_t * worker;
ucc_tl_ucp_team_config_t cfg;
const char * tuning_str;
Expand Down Expand Up @@ -190,6 +216,13 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[];
#define UCC_TL_UCP_REMOTE_RKEY(_ctx, _rank, _seg) \
((_ctx)->rkeys[_rank * _ctx->n_rinfo_segs + _seg])

#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _size, _seg) \
((_ctx)->rkeys[_size * _ctx->n_rinfo_segs + _rank * _ctx->n_dynrinfo_segs + _seg])

/*
#define UCC_TL_UCP_REMOTE_DYN_RVA(_ctx, _rank, _seg) \
(PTR_OFFSET((_ctx)->dyn_seg.dyn_buff, (_ctx)->dyn_seg.seg_group_start[_seg] + (_ctx)->dyn_seg.seg_groups[_seg] * _rank))
*/
extern ucs_memory_type_t ucc_memtype_to_ucs[UCC_MEMORY_TYPE_LAST+1];

void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr,
Expand All @@ -198,4 +231,45 @@ void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr,
ucc_status_t ucc_tl_ucp_ctx_remote_populate(ucc_tl_ucp_context_t *ctx,
ucc_mem_map_params_t map,
ucc_team_oob_coll_t oob);

// FIXME convert to macro
static inline uint64_t UCC_TL_UCP_REMOTE_DYN_RVA(ucc_tl_ucp_context_t *ctx,
ucc_rank_t rank,
uint64_t seg)
{
int seg_group_id = ctx->dyn_seg.seg_groups[seg];
uint64_t *prva = PTR_OFFSET(ctx->dyn_seg.dyn_buff, ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t));
return *prva;//[seg - ctx->dyn_seg.starting_seg[seg]];
}

// FIXME convert to macro
static inline uint64_t UCC_TL_UCP_REMOTE_DYN_LEN(ucc_tl_ucp_context_t *ctx,
ucc_rank_t rank,
uint64_t seg)
{
int seg_group_id = ctx->dyn_seg.seg_groups[seg];
uint64_t *plen = PTR_OFFSET(ctx->dyn_seg.dyn_buff, sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t));
return *plen;//[seg - ctx->dyn_seg.starting_seg[seg]];
}

// FIXME convert to macro
static inline uint64_t UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(ucc_tl_ucp_context_t *ctx,
ucc_rank_t rank,
uint64_t seg)
{
int seg_group_id = ctx->dyn_seg.seg_groups[seg];
uint64_t *pkey_size = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 2 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t));
return *pkey_size;//[seg - ctx->dyn_seg.starting_seg[seg]];
}

// FIXME convert to macro
static inline void * UCC_TL_UCP_REMOTE_DYN_KEY(ucc_tl_ucp_context_t *ctx,
ucc_rank_t rank,
size_t offset,
uint64_t seg)
{
int seg_group_id = ctx->dyn_seg.seg_groups[seg];
void *pkey = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 3 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + offset);
return pkey;
}
#endif
Loading

0 comments on commit 5c25b9c

Please sign in to comment.