Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Dynamic Symmetric Memory #909

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
11 changes: 8 additions & 3 deletions src/components/tl/ucp/alltoall/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ ucc_status_t ucc_tl_ucp_alltoall_pairwise_init(ucc_base_coll_args_t *coll_args,
}

ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task;
Expand All @@ -99,7 +99,12 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
*task_h = &task->super;
task->super.post = ucc_tl_ucp_alltoall_onesided_start;
task->super.progress = ucc_tl_ucp_alltoall_onesided_progress;
status = UCC_OK;

status = ucc_tl_ucp_coll_dynamic_segment_init(&coll_args->args, task);
if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failed to initialize dynamic segments");
}
out:
return status;
}
22 changes: 16 additions & 6 deletions src/components/tl/ucp/alltoall/alltoall_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "alltoall.h"
#include "core/ucc_progress_queue.h"
#include "utils/ucc_math.h"
#include "tl_ucp_coll.h"
#include "tl_ucp_sendrecv.h"

void ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask);
Expand All @@ -23,24 +24,33 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask)
ucc_rank_t grank = UCC_TL_TEAM_RANK(team);
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);
ucc_rank_t start = (grank + 1) % gsize;
long * pSync = TASK_ARGS(task).global_work_buffer;
long *pSync = TASK_ARGS(task).global_work_buffer;
ucc_memory_type_t mtype = TASK_ARGS(task).src.info.mem_type;
ucc_rank_t peer;
ucc_status_t status;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
status = ucc_tl_ucp_coll_dynamic_segment_exchange(task);
if (UCC_OK != status) {
task->super.status = status;
goto out;
}

/* TODO: change when support for library-based work buffers is complete */
nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype);
dest = dest + grank * nelems;
UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + start * nelems),
(void *)dest, nelems, start, team, task),
(void *)dest, nelems, start, mtype, team,
task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, start, team), task, out);

for (peer = (start + 1) % gsize; peer != start; peer = (peer + 1) % gsize) {
UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + peer * nelems),
(void *)dest, nelems, peer, team, task),
(void *)dest, nelems, peer, mtype, team,
task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task,
out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out);
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
Expand All @@ -60,5 +70,5 @@ void ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask)
}

pSync[0] = 0;
task->super.status = UCC_OK;
task->super.status = ucc_tl_ucp_coll_dynamic_segment_finalize(task);
}
17 changes: 15 additions & 2 deletions src/components/tl/ucp/alltoallv/alltoallv_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
ucc_aint_t *d_disp = TASK_ARGS(task).dst.info_v.displacements;
size_t sdt_size = ucc_dt_size(TASK_ARGS(task).src.info_v.datatype);
size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype);
ucc_memory_type_t mtype = TASK_ARGS(task).src.info_v.mem_type;
ucc_rank_t peer;
ucc_status_t status;
size_t sd_disp, dd_disp, data_size;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
status = ucc_tl_ucp_coll_dynamic_segment_exchange(task);
if (UCC_OK != status) {
task->super.status = status;
goto out;
}

/* perform a put to each member peer using the peer's index in the
* destination displacement. */
Expand All @@ -46,7 +53,7 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)

UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp),
PTR_OFFSET(dest, dd_disp),
data_size, peer, team, task),
data_size, peer, mtype, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out);
}
Expand All @@ -68,6 +75,7 @@ void ucc_tl_ucp_alltoallv_onesided_progress(ucc_coll_task_t *ctask)

pSync[0] = 0;
task->super.status = UCC_OK;
ucc_tl_ucp_coll_dynamic_segment_finalize(task);
}

ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
Expand Down Expand Up @@ -98,7 +106,12 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
*task_h = &task->super;
task->super.post = ucc_tl_ucp_alltoallv_onesided_start;
task->super.progress = ucc_tl_ucp_alltoallv_onesided_progress;
status = UCC_OK;

status = ucc_tl_ucp_coll_dynamic_segment_init(&coll_args->args, task);
if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failed to initialize dynamic segments");
}
out:
return status;
}
18 changes: 15 additions & 3 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
#define ONESIDED_SYNC_SIZE 1
#define ONESIDED_REDUCE_SIZE 4

typedef enum {
UCC_TL_UCP_DYN_SEG_UPDATE_SRC = UCC_BIT(0),
UCC_TL_UCP_DYN_SEG_UPDATE_DST = UCC_BIT(1),
UCC_TL_UCP_DYN_SEG_UPDATE_GLOBAL = UCC_BIT(2),
} ucc_tl_ucp_dynamic_segment_update_mask_t;

typedef struct ucc_tl_ucp_iface {
ucc_tl_iface_t super;
} ucc_tl_ucp_iface_t;
Expand Down Expand Up @@ -120,11 +126,16 @@ typedef struct ucc_tl_ucp_context {
ucc_tl_ucp_worker_t service_worker;
uint32_t service_worker_throttling_count;
ucc_mpool_t req_mp;
ucc_tl_ucp_remote_info_t * remote_info;
ucc_tl_ucp_remote_info_t *remote_info;
ucp_rkey_h * rkeys;
uint64_t n_rinfo_segs;
uint64_t ucp_memory_types;
int topo_required;
ucc_tl_ucp_remote_info_t *dynamic_remote_info;
void *dyn_seg_buf;
ucp_rkey_h *dyn_rkeys;
size_t dyn_seg_size;
size_t n_dynrinfo_segs;
} ucc_tl_ucp_context_t;
UCC_CLASS_DECLARE(ucc_tl_ucp_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);
Expand All @@ -135,8 +146,6 @@ typedef struct ucc_tl_ucp_team {
ucc_status_t status;
uint32_t seq_num;
ucc_tl_ucp_task_t *preconnect_task;
void * va_base[MAX_NR_SEGMENTS];
size_t base_length[MAX_NR_SEGMENTS];
ucc_tl_ucp_worker_t * worker;
ucc_tl_ucp_team_config_t cfg;
const char * tuning_str;
Expand Down Expand Up @@ -190,6 +199,9 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[];
#define UCC_TL_UCP_REMOTE_RKEY(_ctx, _rank, _seg) \
((_ctx)->rkeys[_rank * _ctx->n_rinfo_segs + _seg])

#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _seg) \
((_ctx)->dyn_rkeys[_rank * _ctx->n_dynrinfo_segs + _seg])

extern ucs_memory_type_t ucc_memtype_to_ucs[UCC_MEMORY_TYPE_LAST+1];

void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr,
Expand Down
Loading
Loading