diff --git a/src/coll_patterns/recursive_knomial.h b/src/coll_patterns/recursive_knomial.h index 1888169f8b..228131a40b 100644 --- a/src/coll_patterns/recursive_knomial.h +++ b/src/coll_patterns/recursive_knomial.h @@ -23,6 +23,8 @@ enum { KN_PATTERN_ALLGATHER, KN_PATTERN_ALLGATHERV, KN_PATTERN_ALLGATHERX, + KN_PATTERN_GATHER, + KN_PATTERN_GATHERX, }; typedef struct ucc_knomial_pattern { @@ -83,7 +85,7 @@ static inline ucc_rank_t ucc_kn_pattern_radix_pow_init(ucc_knomial_pattern_t *p, static inline void ucc_knomial_pattern_init_impl(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, ucc_knomial_pattern_t *p, - int backward) + int backward, int extra) { ucc_rank_t fs = radix; ucc_rank_t n_full_subtrees; @@ -100,7 +102,7 @@ ucc_knomial_pattern_init_impl(ucc_rank_t size, ucc_rank_t rank, p->backward = backward; p->iteration = 0; n_full_subtrees = ucc_kn_pattern_n_full(p); - p->n_extra = size - n_full_subtrees * p->full_pow_size; + p->n_extra = extra ? size - n_full_subtrees * p->full_pow_size : 0; p->n_iters = (p->n_extra && n_full_subtrees == 1) ? p->pow_radix_sup - 1 : p->pow_radix_sup; p->radix_pow = ucc_kn_pattern_radix_pow_init(p, backward); @@ -115,14 +117,21 @@ ucc_knomial_pattern_init_backward(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, ucc_knomial_pattern_t *p) { - ucc_knomial_pattern_init_impl(size, rank, radix, p, 1); + ucc_knomial_pattern_init_impl(size, rank, radix, p, 1, 1); } static inline void ucc_knomial_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, ucc_knomial_pattern_t *p) { - ucc_knomial_pattern_init_impl(size, rank, radix, p, 0); + ucc_knomial_pattern_init_impl(size, rank, radix, p, 0, 1); +} + +static inline void +ucc_knomial_pattern_init_no_extra(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, + ucc_knomial_pattern_t *p) +{ + ucc_knomial_pattern_init_impl(size, rank, radix, p, 0, 0); } static inline ucc_rank_t @@ -186,6 +195,23 @@ ucc_knomial_pattern_get_loop_peer(ucc_knomial_pattern_t *p, ucc_rank_t rank, ucc_knomial_pattern_loop_rank_inv(p, peer); } +static inline ucc_rank_t +ucc_knomial_pattern_get_base_rank(ucc_knomial_pattern_t *p, ucc_rank_t rank) +{ + ucc_rank_t step_size = p->radix_pow * p->radix; + ucc_rank_t lrank; + ucc_kn_radix_t s; + + lrank = ucc_knomial_pattern_loop_rank(p, rank); + s = ucc_div_round_up(step_size - (lrank % step_size), p->radix_pow); + + if (s == p->radix) { + return rank; + } else { + return ucc_knomial_pattern_get_loop_peer(p, rank, s); + } +} + static inline void ucc_knomial_pattern_next_iteration(ucc_knomial_pattern_t *p) { @@ -224,11 +250,13 @@ static inline ucc_rank_t ucc_knomial_calc_recv_dist(ucc_rank_t team_size, ucc_rank_t rank, ucc_rank_t radix, ucc_rank_t root) { + ucc_rank_t root_base = 0 ; + ucc_rank_t dist = 1; + if (rank == root) { return 0; } - ucc_rank_t root_base = 0 ; - ucc_rank_t dist = 1; + while (dist <= team_size) { if (rank < root_base + radix * dist) { break; diff --git a/src/coll_patterns/sra_knomial.h b/src/coll_patterns/sra_knomial.h index 11b99dcf53..2d0362b77d 100644 --- a/src/coll_patterns/sra_knomial.h +++ b/src/coll_patterns/sra_knomial.h @@ -196,6 +196,60 @@ ucc_kn_ag_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, p->block_size; } +static inline void +ucc_kn_g_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, + size_t count, ucc_knomial_pattern_t *p) +{ + ucc_knomial_pattern_init_no_extra(size, rank, radix, p); + p->type = KN_PATTERN_GATHER; + p->count = count; + p->block_size = p->radix_pow * radix; + p->block_offset = ucc_knomial_pattern_loop_rank(p, rank) / p->block_size * + p->block_size; +} + +static inline void +ucc_kn_gx_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, + size_t count, ucc_knomial_pattern_t *p) +{ + ucc_knomial_pattern_init_backward(size, rank, radix, p); + p->type = KN_PATTERN_GATHERX; + p->count = count; + if (p->node_type != KN_NODE_EXTRA) { + p->block_size = ucc_kn_compute_step_radix(p); + ucc_knx_block(rank, size, radix, count, p->n_iters - 1, + &p->block_size_counts, &p->block_offset); + + } + +} + +static inline void +ucc_kn_g_pattern_peer_seg(ucc_rank_t peer, ucc_knomial_pattern_t *p, + size_t *seg_count, ptrdiff_t *seg_offset) +{ + ucc_rank_t step_radix, seg_index; + + *seg_count = 0; + *seg_offset = 0; + switch (p->type) { + case KN_PATTERN_GATHER: + *seg_count = ucc_min(p->radix_pow, p->size - peer) * (p->count / p->size); + *seg_offset = peer * (p->count / p->size); + return; + case KN_PATTERN_GATHERX: + step_radix = ucc_kn_compute_step_radix(p); + seg_index = ucc_kn_compute_seg_index(peer, p->radix_pow, p); + *seg_offset = ucc_buffer_block_offset(p->block_size_counts, step_radix, + seg_index) + p->block_offset; + *seg_count = ucc_buffer_block_count(p->block_size_counts, step_radix, + seg_index); + return; + default: + ucc_assert(0); + } +} + static inline void ucc_kn_agx_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, size_t count, ucc_knomial_pattern_t *p) @@ -283,6 +337,28 @@ static inline void ucc_kn_ag_pattern_next_iter(ucc_knomial_pattern_t *p) } } +static inline void ucc_kn_g_pattern_next_iter(ucc_knomial_pattern_t *p) +{ + ucc_rank_t rank; + if (p->type == KN_PATTERN_GATHERX) { + ucc_knomial_pattern_next_iteration_backward(p); + + if (!ucc_knomial_pattern_loop_done(p)) { + ucc_knx_block(p->rank, p->size, p->radix, p->count, + p->n_iters - 1 - p->iteration, + &p->block_size_counts, &p->block_offset); + } + } else { + rank = ucc_knomial_pattern_loop_rank(p, p->rank); + ucc_knomial_pattern_next_iteration(p); + + if (!ucc_knomial_pattern_loop_done(p)) { + p->block_size *= ucc_kn_compute_step_radix(p); + p->block_offset = rank / p->block_size * p->block_size; + } + } +} + static inline void ucc_kn_rs_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix, size_t count, ucc_knomial_pattern_t *p) diff --git a/src/components/cl/hier/bcast/bcast_2step.c b/src/components/cl/hier/bcast/bcast_2step.c index c2ea59b755..824accfbbf 100644 --- a/src/components/cl/hier/bcast/bcast_2step.c +++ b/src/components/cl/hier/bcast/bcast_2step.c @@ -93,6 +93,7 @@ find_root_node_rank(ucc_rank_t root, ucc_cl_hier_team_t *cl_team) return UCC_RANK_INVALID; } + static ucc_status_t ucc_cl_hier_bcast_2step_init_schedule(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index 02270feaa7..b9522baeb4 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -78,11 +78,12 @@ gatherv = \ gatherv/gatherv.c \ gatherv/gatherv_linear.c -reduce = \ - reduce/reduce.h \ - reduce/reduce.c \ - reduce/reduce_knomial.c \ - reduce/reduce_dbt.c +reduce = \ + reduce/reduce.h \ + reduce/reduce.c \ + reduce/reduce_knomial.c \ + reduce/reduce_dbt.c \ + reduce/reduce_srg_knomial.c reduce_scatter = \ reduce_scatter/reduce_scatter.h \ diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c index a1ba003bd3..c8ec600987 100644 --- a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c @@ -151,7 +151,7 @@ ucc_tl_ucp_allreduce_sliding_window_rdma_task_post( ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); ucc_tl_ucp_team_t *team = TASK_TEAM(task); - + ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); @@ -325,9 +325,9 @@ void ucc_tl_ucp_allreduce_sliding_window_rdma_progress(ucc_coll_task_t *coll_tas ucc_tl_ucp_allreduce_sw_buf_t *accbuf = &pipe->accbuf; ucp_request_param_t req_param = {0}; int i = 0; - ucc_coll_task_t *allgather_task = + ucc_coll_task_t *allgather_task = task->allreduce_sliding_window.allgather_task; - ucc_ee_executor_task_t **reduce_task = + ucc_ee_executor_task_t **reduce_task = &task->allreduce_sliding_window.reduce_task; ucc_rank_t put_window_size = UCC_TL_UCP_TEAM_LIB(tl_team)-> @@ -490,7 +490,7 @@ void ucc_tl_ucp_allreduce_sliding_window_rdma_progress(ucc_coll_task_t *coll_tas ucp_worker_fence(tl_ctx->worker.ucp_worker); ucc_tl_ucp_get_ep(tl_team, dst_rank, &ep); - task->allreduce_sliding_window.put_requests[put_idx] = + task->allreduce_sliding_window.put_requests[put_idx] = ucp_put_nbx( ep, src_addr, data_size, (uint64_t)dst_addr, diff --git a/src/components/tl/ucp/gather/gather.c b/src/components/tl/ucp/gather/gather.c index d4748a8025..e9eceac680 100644 --- a/src/components/tl/ucp/gather/gather.c +++ b/src/components/tl/ucp/gather/gather.c @@ -17,62 +17,13 @@ ucc_base_coll_alg_info_t [UCC_TL_UCP_GATHER_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; -static inline uint32_t calc_buffer_size(ucc_rank_t rank, uint32_t radix, ucc_rank_t team_size) -{ - uint32_t radix_valuation; - - if (rank == 0) { - return team_size; - } - radix_valuation = calc_valuation(rank, radix); - return (uint32_t)ucc_min(pow(radix, radix_valuation), team_size - rank); -} - ucc_status_t ucc_tl_ucp_gather_init(ucc_tl_ucp_task_t *task) { - ucc_coll_args_t * args = &TASK_ARGS(task); - ucc_tl_ucp_team_t *team = TASK_TEAM(task); - ucc_rank_t myrank = UCC_TL_TEAM_RANK(team); - ucc_rank_t team_size = UCC_TL_TEAM_SIZE(team); - ucc_rank_t root = args->root; - ucc_rank_t vrank = (myrank - root + team_size) % team_size; - ucc_status_t status = UCC_OK; - ucc_memory_type_t mtype; - ucc_datatype_t dt; - size_t count, data_size; - uint32_t buffer_size; - int isleaf; - - if (root == myrank) { - count = args->dst.info.count; - dt = args->dst.info.datatype; - mtype = args->dst.info.mem_type; - } else { - count = args->src.info.count; - dt = args->src.info.datatype; - mtype = args->src.info.mem_type; - } - data_size = count * ucc_dt_size(dt); - task->super.post = ucc_tl_ucp_gather_knomial_start; - task->super.progress = ucc_tl_ucp_gather_knomial_progress; - task->super.finalize = ucc_tl_ucp_gather_knomial_finalize; - task->gather_kn.radix = - ucc_min(UCC_TL_UCP_TEAM_LIB(team)->cfg.gather_kn_radix, team_size); - CALC_KN_TREE_DIST(team_size, task->gather_kn.radix, - task->gather_kn.max_dist); - isleaf = (vrank % task->gather_kn.radix != 0 || vrank == team_size - 1); - task->gather_kn.scratch_mc_header = NULL; + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t size = UCC_TL_TEAM_SIZE(team); + ucc_kn_radix_t radix; - if (vrank == 0) { - task->gather_kn.scratch = args->dst.info.buffer; - } else if (isleaf) { - task->gather_kn.scratch = args->src.info.buffer; - } else { - buffer_size = calc_buffer_size(vrank, task->gather_kn.radix, team_size); - status = ucc_mc_alloc(&task->gather_kn.scratch_mc_header, - buffer_size * data_size, mtype); - task->gather_kn.scratch = task->gather_kn.scratch_mc_header->addr; - } + radix = ucc_min(UCC_TL_UCP_TEAM_LIB(team)->cfg.gather_kn_radix, size); - return status; + return ucc_tl_ucp_gather_knomial_init_common(task, radix); } diff --git a/src/components/tl/ucp/gather/gather.h b/src/components/tl/ucp/gather/gather.h index 26a3df4138..644f473493 100644 --- a/src/components/tl/ucp/gather/gather.h +++ b/src/components/tl/ucp/gather/gather.h @@ -45,4 +45,12 @@ void ucc_tl_ucp_gather_knomial_progress(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_gather_knomial_finalize(ucc_coll_task_t *task); +ucc_status_t ucc_tl_ucp_gather_knomial_init_common(ucc_tl_ucp_task_t *task, + ucc_kn_radix_t radix); + +/* Internal interface with custom radix */ +ucc_status_t ucc_tl_ucp_gather_knomial_init_r(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h, + ucc_kn_radix_t radix); #endif diff --git a/src/components/tl/ucp/gather/gather_knomial.c b/src/components/tl/ucp/gather/gather_knomial.c index ab6ff8933e..75010e186b 100644 --- a/src/components/tl/ucp/gather/gather_knomial.c +++ b/src/components/tl/ucp/gather/gather_knomial.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -9,135 +9,195 @@ #include "core/ucc_progress_queue.h" #include "tl_ucp_sendrecv.h" #include "utils/ucc_math.h" +#include "coll_patterns/sra_knomial.h" #define SAVE_STATE(_phase) \ do { \ task->gather_kn.phase = _phase; \ } while (0) +static inline uint32_t calc_buffer_size(ucc_rank_t trank, uint32_t radix, + ucc_rank_t tsize) +{ + uint32_t radix_valuation; + + if (trank == 0) { + return tsize; + } + + radix_valuation = calc_valuation(trank, radix); + return (uint32_t)ucc_min(pow(radix, radix_valuation), tsize - trank); +} + void ucc_tl_ucp_gather_knomial_progress(ucc_coll_task_t *coll_task) { - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - ucc_coll_args_t * args = &TASK_ARGS(task); - ucc_tl_ucp_team_t *team = TASK_TEAM(task); - ucc_rank_t team_size = UCC_TL_TEAM_SIZE(team); - ucc_rank_t rank = UCC_TL_TEAM_RANK(team); - ucc_rank_t size = UCC_TL_TEAM_SIZE(team); - ucc_rank_t root = (ucc_rank_t)args->root; - uint32_t radix = task->gather_kn.radix; - ucc_rank_t vrank = (rank - root + size) % size; - ucc_memory_type_t mtype = args->src.info.mem_type; - ucc_status_t status = UCC_OK; - size_t data_size = - args->src.info.count * ucc_dt_size(args->src.info.datatype); - size_t msg_size, msg_count; - void * scratch_offset; - ucc_rank_t vpeer, peer, vroot_at_level, root_at_level, pos; - uint32_t i; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_coll_args_t *args = &TASK_ARGS(task); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); + ucc_rank_t rank = UCC_TL_TEAM_RANK(team); + ucc_rank_t root = (ucc_rank_t)args->root; + uint32_t radix = task->gather_kn.radix; + ucc_rank_t vrank = VRANK(rank, root, tsize); + ucc_memory_type_t mtype = args->src.info.mem_type; + ucc_status_t status = UCC_OK; + ucc_knomial_pattern_t *p = &task->gather_kn.p; + size_t dt_size = ucc_dt_size(args->src.info.datatype); + size_t data_size = args->src.info.count * dt_size; + ucc_coll_type_t ct = args->coll_type; + size_t msg_size, peer_seg_count; + void *scratch_offset; + ucc_rank_t vpeer, peer, vroot_at_level, root_at_level, num_blocks; + ucc_kn_radix_t loop_step; + ptrdiff_t peer_seg_offset; -UCC_GATHER_KN_PHASE_PROGRESS: - if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + if (task->gather_kn.p.node_type == KN_NODE_EXTRA) { + ucc_assert(ct == UCC_COLL_TYPE_REDUCE); + task->super.status = UCC_OK; return; } UCC_GATHER_KN_GOTO_PHASE(task->gather_kn.phase); - UCC_GATHER_KN_PHASE_INIT: - while (task->gather_kn.dist <= task->gather_kn.max_dist) { + while (!ucc_knomial_pattern_loop_done(p)) { + if (task->tagged.send_posted > 0) { + goto UCC_GATHER_KN_PHASE_PROGRESS; + } + scratch_offset = task->gather_kn.scratch; - if (vrank % task->gather_kn.dist == 0) { - pos = (vrank / task->gather_kn.dist) % radix; - if (pos == 0) { - for (i = 1; i < radix; i++) { - vpeer = vrank + i * task->gather_kn.dist; - msg_count = ucc_min(task->gather_kn.dist, team_size - vpeer); - if (vpeer >= size) { - break; - } else if (vrank != 0) { - msg_size = data_size * msg_count; - scratch_offset = PTR_OFFSET( - scratch_offset, data_size * task->gather_kn.dist); - peer = (vpeer + root) % size; - UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(scratch_offset, - msg_size, mtype, peer, - team, task), - task, out); - } else { //The root is a particular case because it must aggregate the data sorted by ranks - peer = (vpeer + root) % size; + vroot_at_level = ucc_knomial_pattern_get_base_rank(p, vrank); + if (vroot_at_level == vrank) { + for (loop_step = 1; loop_step < radix; loop_step++) { + vpeer = ucc_knomial_pattern_get_loop_peer(p, vrank, loop_step); + if (vpeer == UCC_KN_PEER_NULL) { + continue; + } + ucc_kn_g_pattern_peer_seg(vpeer, p, &peer_seg_count, + &peer_seg_offset); + peer = INV_VRANK(vpeer, root, tsize); + if (vrank != 0) { + msg_size = peer_seg_count * dt_size; + if (args->coll_type != UCC_COLL_TYPE_GATHER) { scratch_offset = PTR_OFFSET(task->gather_kn.scratch, - data_size * peer); - // check if received data correspond to contiguous ranks - if (msg_count <= team_size - peer) { - msg_size = data_size * msg_count; - UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(scratch_offset, - msg_size, mtype, - peer, team, task), - task, out); - } else { // in this case, data must be split in two at the destination buffer - msg_size = data_size * (team_size - peer); - UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(scratch_offset, - msg_size, mtype, - peer, team, task), - task, out); - - msg_size = - data_size * (msg_count - (team_size - peer)); - UCPCHECK_GOTO(ucc_tl_ucp_recv_nb( - task->gather_kn.scratch, msg_size, - mtype, peer, team, task), - task, out); + peer_seg_offset * dt_size); + } else { + scratch_offset = PTR_OFFSET(scratch_offset, + data_size * + task->gather_kn.dist); + } + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(scratch_offset, + msg_size, mtype, peer, + team, task), + task, out); + } else { + /* + the root is a particular case because it must aggregate + the data sorted by ranks + */ + scratch_offset = PTR_OFFSET(task->gather_kn.scratch, + data_size * peer); + num_blocks = ucc_min(task->gather_kn.dist, tsize - vpeer); + /* check if received data correspond to contiguous ranks */ + if ((ct == UCC_COLL_TYPE_REDUCE) || + (num_blocks <= tsize - peer)) { + msg_size = peer_seg_count * dt_size; + if (args->coll_type != UCC_COLL_TYPE_GATHER) { + scratch_offset = PTR_OFFSET(task->gather_kn.scratch, + peer_seg_offset * dt_size); } + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(scratch_offset, + msg_size, mtype, + peer, team, task), + task, out); + } else { + /* + in this case, data must be split in two + at the destination buffer + */ + msg_size = data_size * (tsize - peer); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(scratch_offset, + msg_size, mtype, + peer, team, task), + task, out); + msg_size = data_size * (num_blocks - (tsize - peer)); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(task->gather_kn.scratch, + msg_size, mtype, + peer, team, task), + task, out); } } + } - if (task->gather_kn.dist == 1) { //check if first passage - msg_size = data_size; - if (rank != root) { - status = ucc_mc_memcpy(task->gather_kn.scratch, - args->src.info.buffer, msg_size, - args->src.info.mem_type, mtype); - } else if (!UCC_IS_INPLACE(*args)) { - status = ucc_mc_memcpy( - PTR_OFFSET(task->gather_kn.scratch, data_size * rank), - args->src.info.buffer, msg_size, - args->src.info.mem_type, mtype); - } + if ((ct != UCC_COLL_TYPE_REDUCE) && + ucc_knomial_pattern_loop_first_iteration(p)) { + msg_size = data_size; + if (rank != root) { + status = ucc_mc_memcpy(task->gather_kn.scratch, + args->src.info.buffer, msg_size, + args->src.info.mem_type, mtype); + } else if (!UCC_IS_INPLACE(*args)) { + status = ucc_mc_memcpy( + PTR_OFFSET(task->gather_kn.scratch, data_size * rank), + args->src.info.buffer, msg_size, + args->src.info.mem_type, mtype); + } - if (ucc_unlikely(UCC_OK != status)) { - task->super.status = status; - return; - } + if (ucc_unlikely(UCC_OK != status)) { + task->super.status = status; + return; } } else { - vroot_at_level = vrank - pos * task->gather_kn.dist; - root_at_level = (vroot_at_level + root) % size; - msg_count = ucc_min(task->gather_kn.dist, - team_size - vrank); - msg_size = data_size * msg_count; - if (root_at_level != root || msg_count <= team_size - rank) { - UCPCHECK_GOTO(ucc_tl_ucp_send_nb(task->gather_kn.scratch, - msg_size, mtype, - root_at_level, team, task), - task, out); + if (rank == root && ucc_knomial_pattern_loop_first_iteration(p) && !UCC_IS_INPLACE(*args)) { + ucc_kn_g_pattern_peer_seg(vrank, p, &peer_seg_count, + &peer_seg_offset); + status = ucc_mc_memcpy( + PTR_OFFSET(task->gather_kn.scratch, peer_seg_offset * dt_size), + PTR_OFFSET(args->src.info.buffer, peer_seg_offset * dt_size), peer_seg_count * dt_size, + args->src.info.mem_type, mtype); + } + } + } else { + root_at_level = INV_VRANK(vroot_at_level, root, tsize); + num_blocks = ucc_min(task->gather_kn.dist, tsize - vrank); + if ((ct == UCC_COLL_TYPE_REDUCE) || + (root_at_level != root) || + (num_blocks <= tsize - rank)) { + ucc_kn_g_pattern_peer_seg(vrank, p, &peer_seg_count, + &peer_seg_offset); + msg_size = peer_seg_count * dt_size; + if (args->coll_type == UCC_COLL_TYPE_GATHER) { + scratch_offset = task->gather_kn.scratch; } else { - msg_size = data_size * (team_size - rank); - UCPCHECK_GOTO(ucc_tl_ucp_send_nb(task->gather_kn.scratch, - msg_size, mtype, - root_at_level, team, task), - task, out); - msg_size = data_size * (msg_count - (team_size - rank)); - UCPCHECK_GOTO( - ucc_tl_ucp_send_nb( - PTR_OFFSET(task->gather_kn.scratch, - data_size * (team_size - rank)), - msg_size, mtype, root_at_level, team, task), - task, out); + scratch_offset = PTR_OFFSET(task->gather_kn.scratch, + peer_seg_offset * dt_size); } + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(scratch_offset, + msg_size, mtype, + root_at_level, team, task), + task, out); + } else { + msg_size = data_size * (tsize - rank); + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(task->gather_kn.scratch, + msg_size, mtype, + root_at_level, team, task), + task, out); + msg_size = data_size * (num_blocks - (tsize - rank)); + UCPCHECK_GOTO( + ucc_tl_ucp_send_nb(PTR_OFFSET(task->gather_kn.scratch, + data_size * (tsize - rank)), + msg_size, mtype, root_at_level, team, + task), + task, out); } } + +UCC_GATHER_KN_PHASE_PROGRESS: + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + SAVE_STATE(UCC_GATHER_KN_PHASE_PROGRESS); + return; + } task->gather_kn.dist *= radix; - SAVE_STATE(UCC_GATHER_KN_PHASE_INIT); - goto UCC_GATHER_KN_PHASE_PROGRESS; + ucc_kn_g_pattern_next_iter(p); } ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task)); @@ -149,18 +209,29 @@ void ucc_tl_ucp_gather_knomial_progress(ucc_coll_task_t *coll_task) ucc_status_t ucc_tl_ucp_gather_knomial_start(ucc_coll_task_t *coll_task) { - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - ucc_coll_args_t * args = &TASK_ARGS(task); - ucc_tl_ucp_team_t *team = TASK_TEAM(task); - ucc_rank_t root = (ucc_rank_t)args->root; - ucc_rank_t rank = UCC_TL_TEAM_RANK(team); - ucc_rank_t size = UCC_TL_TEAM_SIZE(team); + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_coll_args_t *args = &TASK_ARGS(task); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t root = (ucc_rank_t)args->root; + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + ucc_rank_t size = UCC_TL_TEAM_SIZE(team); - if (root == rank && UCC_IS_INPLACE(*args)) { + if (root == trank && UCC_IS_INPLACE(*args)) { args->src.info = args->dst.info; args->src.info.count = args->dst.info.count / size; } + if (args->coll_type == UCC_COLL_TYPE_GATHER) { + ucc_kn_g_pattern_init(size, VRANK(trank, root, size), + task->gather_kn.radix, args->src.info.count * size, + &task->gather_kn.p); + } else { + task->gather_kn.scratch = args->dst.info.buffer; + ucc_kn_gx_pattern_init(size, VRANK(trank, root, size), + task->gather_kn.radix, args->dst.info.count, + &task->gather_kn.p); + } + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_gather_kn_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); @@ -179,3 +250,104 @@ ucc_status_t ucc_tl_ucp_gather_knomial_finalize(ucc_coll_task_t *coll_task) } return ucc_tl_ucp_coll_finalize(coll_task); } + +ucc_status_t ucc_tl_ucp_gather_knomial_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h) +{ + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(tl_team); + ucc_tl_ucp_task_t *task; + ucc_status_t status; + ucc_kn_radix_t radix; + + task = ucc_tl_ucp_init_task(coll_args, team); + if (ucc_unlikely(!task)) { + return UCC_ERR_NO_MEMORY; + } + + radix = ucc_min(UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.gather_kn_radix, tsize); + + status = ucc_tl_ucp_gather_knomial_init_common(task, radix); + if (ucc_unlikely(status != UCC_OK)) { + ucc_tl_ucp_put_task(task); + return status; + } + *task_h = &task->super; + return UCC_OK; +} + +ucc_status_t ucc_tl_ucp_gather_knomial_init_r(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h, + ucc_kn_radix_t radix) +{ + ucc_tl_ucp_task_t *task; + ucc_status_t status; + + task = ucc_tl_ucp_init_task(coll_args, team); + if (ucc_unlikely(!task)) { + return UCC_ERR_NO_MEMORY; + } + + status = ucc_tl_ucp_gather_knomial_init_common(task, radix); + if (ucc_unlikely(status != UCC_OK)) { + ucc_tl_ucp_put_task(task); + return status; + } + *task_h = &task->super; + return UCC_OK; +} + +ucc_status_t ucc_tl_ucp_gather_knomial_init_common(ucc_tl_ucp_task_t *task, + ucc_kn_radix_t radix) +{ + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); + ucc_coll_args_t *args = &TASK_ARGS(task); + ucc_rank_t root = args->root; + ucc_rank_t vrank = VRANK(trank, root, tsize); + ucc_status_t status = UCC_OK; + ucc_memory_type_t mtype; + ucc_datatype_t dt; + size_t count, data_size; + uint32_t buffer_size; + int isleaf; + + if (UCC_IS_ROOT(*args, trank)) { + count = args->dst.info.count; + dt = args->dst.info.datatype; + mtype = args->dst.info.mem_type; + } else { + count = args->src.info.count; + dt = args->src.info.datatype; + mtype = args->src.info.mem_type; + } + data_size = count * ucc_dt_size(dt); + task->super.post = ucc_tl_ucp_gather_knomial_start; + task->super.progress = ucc_tl_ucp_gather_knomial_progress; + task->super.finalize = ucc_tl_ucp_gather_knomial_finalize; + task->gather_kn.radix = radix; + CALC_KN_TREE_DIST(tsize, task->gather_kn.radix, + task->gather_kn.max_dist); + task->gather_kn.scratch_mc_header = NULL; + + if (args->coll_type == UCC_COLL_TYPE_REDUCE) { + task->gather_kn.scratch = args->dst.info.buffer; + } else { + isleaf = ((vrank % radix != 0) || (vrank == tsize - 1)); + if (vrank == 0) { + task->gather_kn.scratch = args->dst.info.buffer; + } else if (isleaf) { + task->gather_kn.scratch = args->src.info.buffer; + } else { + buffer_size = calc_buffer_size(vrank, task->gather_kn.radix, tsize); + status = ucc_mc_alloc(&task->gather_kn.scratch_mc_header, + buffer_size * data_size, mtype); + task->gather_kn.scratch = task->gather_kn.scratch_mc_header->addr; + } + } + + return status; +} diff --git a/src/components/tl/ucp/reduce/reduce.c b/src/components/tl/ucp/reduce/reduce.c index 8a9fb3b74b..9d1bd67048 100644 --- a/src/components/tl/ucp/reduce/reduce.c +++ b/src/components/tl/ucp/reduce/reduce.c @@ -18,6 +18,11 @@ ucc_base_coll_alg_info_t .name = "dbt", .desc = "reduce over double binary tree where a leaf in one tree " "will be intermediate in other (optimized for BW)"}, + [UCC_TL_UCP_REDUCE_ALG_SRG] = + {.id = UCC_TL_UCP_REDUCE_ALG_SRG, + .name = "srg", + .desc = "recursive knomial scatter-reduce followed by knomial " + "gather"}, [UCC_TL_UCP_REDUCE_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; diff --git a/src/components/tl/ucp/reduce/reduce.h b/src/components/tl/ucp/reduce/reduce.h index 98bc183ff3..e8f20fca67 100644 --- a/src/components/tl/ucp/reduce/reduce.h +++ b/src/components/tl/ucp/reduce/reduce.h @@ -10,6 +10,7 @@ enum { UCC_TL_UCP_REDUCE_ALG_KNOMIAL, UCC_TL_UCP_REDUCE_ALG_DBT, + UCC_TL_UCP_REDUCE_ALG_SRG, UCC_TL_UCP_REDUCE_ALG_LAST }; @@ -17,7 +18,7 @@ extern ucc_base_coll_alg_info_t ucc_tl_ucp_reduce_algs[UCC_TL_UCP_REDUCE_ALG_LAST + 1]; #define UCC_TL_UCP_REDUCE_DEFAULT_ALG_SELECT_STR \ - "reduce:0-inf:@0" + "reduce:0-32K:@0#reduce:32K-inf:@2" /* A set of convenience macros used to implement sw based progress of the reduce algorithm that uses kn pattern */ @@ -55,8 +56,8 @@ static inline int ucc_tl_ucp_reduce_alg_from_str(const char *str) ucc_status_t ucc_tl_ucp_reduce_init(ucc_tl_ucp_task_t *task); ucc_status_t ucc_tl_ucp_reduce_knomial_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_coll_task_t **task_h); + ucc_base_team_t *team, + ucc_coll_task_t **task_h); ucc_status_t ucc_tl_ucp_reduce_knomial_start(ucc_coll_task_t *task); @@ -65,7 +66,10 @@ void ucc_tl_ucp_reduce_knomial_progress(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_reduce_knomial_finalize(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_reduce_dbt_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_coll_task_t **task_h); + ucc_base_team_t *team, + ucc_coll_task_t **task_h); +ucc_status_t ucc_tl_ucp_reduce_srg_knomial_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h); #endif diff --git a/src/components/tl/ucp/reduce/reduce_srg_knomial.c b/src/components/tl/ucp/reduce/reduce_srg_knomial.c new file mode 100644 index 0000000000..7ded90453d --- /dev/null +++ b/src/components/tl/ucp/reduce/reduce_srg_knomial.c @@ -0,0 +1,334 @@ +/** + * Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "config.h" +#include "reduce.h" +#include "core/ucc_progress_queue.h" +#include "tl_ucp_sendrecv.h" +#include "coll_patterns/sra_knomial.h" +#include "utils/ucc_math.h" +#include "utils/ucc_coll_utils.h" +#include "components/mc/ucc_mc.h" +#include "../reduce_scatter/reduce_scatter.h" +#include "../gather/gather.h" +#include "../allgather/allgather.h" + +/* SRG - scatter-reduce-gather knomial algorithm + 1. The algorithm performs collective reduce operation as a sequence of + K-nomial Reduce-Scatter followed by K-nomial (with the same radix K) + gather. + 2. In essence this is an extension of the Bi-nomial SRA algorithm algorithm + proposed by Rabenseifner2004 (https://doi.org/10.1007/978-3-540-24685-5_1). + The extension adds the support for arbitrary radix. + 3. The algorithm targets Large message sizes (ie. optimized for max bandwidth). + 4. If number of ranks in the team can not form a full radix subtree + (for radix=2 this means the team size is not power of 2) then there will be + "extra" ranks which don't participate in the main exchange loop. They + will send the data to their "proxy" ranks in the beginning and then wait + for the response with the final data. + 5. The knomial reduce-scatter and gather primitives can be used separately. + However, if they are used together as part of SRG reduce one has to + provide the same radix for both routines. + 6. After the completion of reduce-scatter phase the local result (at non EXTRA + ranks) will be located in dst buffer at offset the can be commputed by the + routine from coll_patterns/sra_knomial.h: ucc_sra_kn_get_offset. + */ + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_start(ucc_coll_task_t *task) +{ + return ucc_schedule_start(task); +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_finalize(ucc_coll_task_t *task) +{ + ucc_schedule_t *schedule = ucc_derived_of(task, ucc_schedule_t); + ucc_status_t status; + + status = ucc_schedule_finalize(task); + ucc_tl_ucp_put_schedule(schedule); + return status; +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_setup(ucc_schedule_pipelined_t *schedule_p, + ucc_schedule_t *frag, int frag_num) +{ + int n_frags = schedule_p->super.n_tasks; + ucc_coll_args_t *args = &schedule_p->super.super.bargs.args; + size_t dt_size; + size_t count; + size_t frag_count; + size_t offset; + ucc_coll_args_t *targs; + ucc_rank_t rank; + ucc_tl_ucp_team_t *team; + + team = TASK_TEAM(&schedule_p->super); + rank = UCC_TL_TEAM_RANK(team); + if (UCC_IS_ROOT(*args, rank)) { + count = args->dst.info.count; + dt_size = ucc_dt_size(args->dst.info.datatype); + } else { + count = args->src.info.count; + dt_size = ucc_dt_size(args->src.info.datatype); + } + frag_count = ucc_buffer_block_count(count, n_frags, frag_num); + offset = ucc_buffer_block_offset(count, n_frags, frag_num); + + targs = &frag->tasks[0]->bargs.args; /* REDUCE_SCATTER */ + targs->src.info.buffer = PTR_OFFSET(targs->src.info.buffer, offset * dt_size); + targs->src.info.count = frag_count; + targs->dst.info.buffer = PTR_OFFSET(targs->dst.info.buffer, offset * dt_size); + targs->dst.info.count = frag_count; + + targs = &frag->tasks[1]->bargs.args; /* GATHER */ + targs->src.info.buffer = PTR_OFFSET(targs->src.info.buffer, offset * dt_size);; + targs->src.info.count = 0; + targs->dst.info.buffer = PTR_OFFSET(targs->dst.info.buffer, offset * dt_size); + targs->dst.info.count = frag_count; + + return UCC_OK; +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_init(ucc_base_coll_args_t *coll_args, + ucc_schedule_pipelined_t *sp, + ucc_base_team_t *team, + ucc_schedule_t **frag_p) +{ + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_base_coll_args_t args = *coll_args; + ucc_mrange_uint_t *p = &tl_team->cfg.reduce_srg_kn_radix; + ucc_rank_t trank = UCC_TL_TEAM_RANK(tl_team); + ucc_schedule_t *schedule; + ucc_coll_task_t *task, *rs_task; + ucc_status_t status; + ucc_kn_radix_t radix, cfg_radix; + size_t count; + ucc_datatype_t dt; + void *rs_rbuf, *rs_sbuf; + ucc_tl_ucp_schedule_t *rsg_schedule; + ucc_memory_type_t mt; + + rsg_schedule = ucc_derived_of(sp, ucc_tl_ucp_schedule_t); + status = ucc_tl_ucp_get_schedule(tl_team, coll_args, + (ucc_tl_ucp_schedule_t **)&schedule); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + + if (UCC_IS_ROOT(coll_args->args, trank)) { + dt = coll_args->args.dst.info.datatype; + mt = coll_args->args.dst.info.mem_type; + if (UCC_IS_INPLACE(coll_args->args)) { + rs_rbuf = rsg_schedule->scratch_mc_header->addr; + rs_sbuf = coll_args->args.dst.info.buffer; + } else { + rs_rbuf = coll_args->args.dst.info.buffer; + rs_sbuf = coll_args->args.src.info.buffer; + } + count = coll_args->args.dst.info.count; + } else { + dt = coll_args->args.src.info.datatype; + mt = coll_args->args.src.info.mem_type; + rs_rbuf = rsg_schedule->scratch_mc_header->addr; + rs_sbuf = coll_args->args.src.info.buffer; + count = coll_args->args.src.info.count; + } + + if (coll_args->mask & UCC_BASE_CARGS_MAX_FRAG_COUNT) { + count = coll_args->max_frag_count; + } + + args.args.flags &= ~UCC_COLL_ARGS_FLAG_IN_PLACE; + args.args.dst.info.buffer = rs_rbuf; + args.args.dst.info.count = count; + args.args.dst.info.datatype = dt; + args.args.dst.info.mem_type = mt; + args.args.src.info.buffer = rs_sbuf; + args.args.src.info.count = count; + args.args.src.info.datatype = dt; + args.args.src.info.mem_type = mt; + + cfg_radix = ucc_tl_ucp_get_radix_from_range(tl_team, + count * ucc_dt_size(dt), + mt, p, 4); + radix = ucc_knomial_pattern_get_min_radix(cfg_radix, + UCC_TL_TEAM_SIZE(tl_team), + count); + + /* 1st step of reduce: knomial reduce_scatter */ + UCC_CHECK_GOTO(ucc_tl_ucp_reduce_scatter_knomial_init_r(&args, team, &task, + radix), + out, status); + UCC_CHECK_GOTO(ucc_schedule_add_task(schedule, task), out, status); + UCC_CHECK_GOTO(ucc_task_subscribe_dep(&schedule->super, task, + UCC_EVENT_SCHEDULE_STARTED), + out, status); + rs_task = task; + + /* 2nd step of reduce: knomial gather */ + args.args.src.info.buffer = rs_rbuf; + if (UCC_IS_ROOT(coll_args->args, trank)) { + if (UCC_IS_INPLACE (coll_args->args)) { + args.args.dst.info.buffer = rs_sbuf; + args.args.src.info.buffer = rs_rbuf; + } else { + args.args.dst.info.buffer = rs_rbuf; + args.args.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + args.args.flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; + + } + } else { + args.args.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + args.args.flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; + } + + UCC_CHECK_GOTO(ucc_tl_ucp_gather_knomial_init_r(&args, team, &task, radix), + out, status); + UCC_CHECK_GOTO(ucc_schedule_add_task(schedule, task), out, status); + UCC_CHECK_GOTO(ucc_task_subscribe_dep(rs_task, task, UCC_EVENT_COMPLETED), + out, status); + schedule->super.finalize = ucc_tl_ucp_reduce_srg_knomial_frag_finalize; + schedule->super.post = ucc_tl_ucp_reduce_srg_knomial_frag_start; + *frag_p = schedule; + return UCC_OK; +out: + return status; +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_finalize(ucc_coll_task_t *task) +{ + ucc_tl_ucp_schedule_t *schedule = ucc_derived_of(task, + ucc_tl_ucp_schedule_t); + ucc_status_t status; + + UCC_TL_UCP_PROFILE_REQUEST_EVENT(schedule, "ucp_reduce_srg_kn_done", 0); + if (schedule->scratch_mc_header) { + ucc_mc_free(schedule->scratch_mc_header); + } + status = ucc_schedule_pipelined_finalize(task); + ucc_tl_ucp_put_schedule(&schedule->super.super); + return status; +} + +ucc_status_t ucc_tl_ucp_reduce_srg_knomial_start(ucc_coll_task_t *task) +{ + UCC_TL_UCP_PROFILE_REQUEST_EVENT(task, "ucp_reduce_srg_kn_start", 0); + return ucc_schedule_pipelined_post(task); +} + +static void +ucc_tl_ucp_reduce_srg_knomial_get_pipeline_params(ucc_tl_ucp_team_t *team, + ucc_memory_type_t mtype, + ucc_pipeline_params_t *pp) +{ + ucc_tl_ucp_lib_config_t *cfg = &team->cfg; + ucc_mc_attr_t mc_attr; + + if (!ucc_pipeline_params_is_auto(&cfg->reduce_srg_kn_pipeline)) { + *pp = cfg->reduce_srg_kn_pipeline; + return; + } + + if (mtype == UCC_MEMORY_TYPE_CUDA) { + mc_attr.field_mask = UCC_MC_ATTR_FIELD_FAST_ALLOC_SIZE; + ucc_mc_get_attr(&mc_attr, UCC_MEMORY_TYPE_CUDA); + pp->threshold = mc_attr.fast_alloc_size; + pp->n_frags = 2; + pp->frag_size = mc_attr.fast_alloc_size; + pp->order = UCC_PIPELINE_PARALLEL; + pp->pdepth = 2; + } else { + pp->threshold = SIZE_MAX; + pp->n_frags = 0; + pp->frag_size = 0; + pp->pdepth = 1; + pp->order = UCC_PIPELINE_PARALLEL; + } +} + +ucc_status_t ucc_tl_ucp_reduce_srg_knomial_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h) +{ + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_coll_args_t *args = &coll_args->args; + ucc_rank_t trank = UCC_TL_TEAM_RANK(tl_team); + int n_frags, pipeline_depth; + ucc_tl_ucp_schedule_t *schedule; + ucc_status_t st; + ucc_base_coll_args_t bargs; + size_t max_frag_count, dt_size, count; + ucc_pipeline_params_t pipeline_params; + ucc_datatype_t dt; + ucc_memory_type_t mt; + + st = ucc_tl_ucp_get_schedule(tl_team, coll_args, &schedule); + if (ucc_unlikely(UCC_OK != st)) { + goto err_out; + } + + schedule->scratch_mc_header = NULL; + if (UCC_IS_ROOT(*args, trank)) { + count = args->dst.info.count; + dt = args->dst.info.datatype; + mt = args->dst.info.mem_type; + dt_size = ucc_dt_size(dt); + } else { + count = args->src.info.count; + dt = args->src.info.datatype; + mt = args->src.info.mem_type; + dt_size = ucc_dt_size(dt); + } + + if (!UCC_IS_ROOT(*args, trank) || UCC_IS_INPLACE(*args)) { + st = ucc_mc_alloc(&schedule->scratch_mc_header, count * dt_size, mt); + if (ucc_unlikely(UCC_OK != st)) { + tl_error(team->context->lib, "failed to alloc scratch memory"); + goto err_free_schedule; + } + } + + bargs = *coll_args; + max_frag_count = (bargs.mask & UCC_BASE_CARGS_MAX_FRAG_COUNT) ? + bargs.max_frag_count: count; + ucc_tl_ucp_reduce_srg_knomial_get_pipeline_params(tl_team, mt, + &pipeline_params); + ucc_pipeline_nfrags_pdepth(&pipeline_params, max_frag_count * dt_size, + &n_frags, &pipeline_depth); + if (n_frags > 1) { + bargs.mask |= UCC_BASE_CARGS_MAX_FRAG_COUNT; + bargs.max_frag_count = ucc_buffer_block_count(max_frag_count, n_frags, 0); + } + + st = ucc_schedule_pipelined_init(&bargs, team, + ucc_tl_ucp_reduce_srg_knomial_frag_init, + ucc_tl_ucp_reduce_srg_knomial_frag_setup, + pipeline_depth, n_frags, + pipeline_params.order, + &schedule->super); + if (ucc_unlikely(UCC_OK != st)) { + tl_error(team->context->lib, "failed to init pipelined schedule"); + goto err_free_scratch; + } + + schedule->super.super.super.finalize = ucc_tl_ucp_reduce_srg_knomial_finalize; + schedule->super.super.super.post = ucc_tl_ucp_reduce_srg_knomial_start; + + *task_h = &schedule->super.super.super; + return UCC_OK; + +err_free_scratch: + ucc_mc_free(schedule->scratch_mc_header); +err_free_schedule: + ucc_tl_ucp_put_schedule(&schedule->super.super); +err_out: + return st; +} diff --git a/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c b/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c index 365579d8f9..591258abd4 100644 --- a/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c +++ b/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c @@ -20,6 +20,7 @@ size_t _count = 0; \ switch ((_args)->coll_type) { \ case UCC_COLL_TYPE_ALLREDUCE: \ + case UCC_COLL_TYPE_REDUCE: \ _count = (_args)->dst.info.count; \ break; \ case UCC_COLL_TYPE_REDUCE_SCATTER: \ @@ -48,7 +49,7 @@ typedef struct ucc_tl_ucp_rs_work_buf { void *reduce_loop; } ucc_tl_ucp_rs_work_buf_t; -/* get work buffers for allreduce */ +/* get work buffers for allreduce and reduce */ static inline void get_sbuf_rbuf_ar(ucc_tl_ucp_task_t *task, size_t block_count, ucc_tl_ucp_rs_work_buf_t *wb) @@ -188,6 +189,7 @@ static inline void get_rs_work_buf(ucc_tl_ucp_task_t *task, switch (args->coll_type) { case UCC_COLL_TYPE_ALLREDUCE: + case UCC_COLL_TYPE_REDUCE: return get_sbuf_rbuf_ar(task, block_count, wb); case UCC_COLL_TYPE_REDUCE_SCATTER: return get_sbuf_rbuf_rs(task, wb); @@ -214,6 +216,7 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) size_t data_size = count * dt_size; ucc_rank_t rank = task->subset.myrank; ucc_rank_t size = task->subset.map.ep_num; + ucc_rank_t root = 0; size_t local_seg_count = 0; ucc_tl_ucp_rs_work_buf_t wb = (ucc_tl_ucp_rs_work_buf_t){0}; ptrdiff_t peer_seg_offset, local_seg_offset; @@ -224,12 +227,18 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) void *local_data; int is_avg; + if (args->coll_type == UCC_COLL_TYPE_REDUCE) { + root = args->root; + rank = VRANK(rank, root, size); + } + UCC_KN_REDUCE_GOTO_PHASE(task->reduce_scatter_kn.phase); block_count = ucc_sra_kn_compute_block_count(count, rank, p); get_rs_work_buf(task, block_count, &wb); if (KN_NODE_EXTRA == node_type) { peer = ucc_ep_map_eval(task->subset.map, ucc_knomial_pattern_get_proxy(p, rank)); + peer = INV_VRANK(peer, root, size); UCPCHECK_GOTO(ucc_tl_ucp_send_nb(wb.src_data, data_size, mem_type, peer, team, task), task, out); @@ -243,6 +252,7 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) if (KN_NODE_PROXY == node_type) { peer = ucc_ep_map_eval(task->subset.map, ucc_knomial_pattern_get_extra(p, rank)); + peer = INV_VRANK(peer, root, size); UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(wb.dst_proxy, data_size, mem_type, peer, team, task), task, out); @@ -288,7 +298,8 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) } ucc_kn_rs_pattern_peer_seg(peer, p, &peer_seg_count, &peer_seg_offset); - peer = ucc_ep_map_eval(task->subset.map, peer); + peer = INV_VRANK(ucc_ep_map_eval(task->subset.map, peer), root, + size); UCPCHECK_GOTO( ucc_tl_ucp_send_nb(PTR_OFFSET(wb.src_loop, peer_seg_offset * dt_size), peer_seg_count * dt_size, mem_type, peer, @@ -371,9 +382,10 @@ ucc_status_t ucc_tl_ucp_reduce_scatter_knomial_start(ucc_coll_task_t *coll_task) ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); ucc_coll_args_t *args = &TASK_ARGS(task); ucc_tl_ucp_team_t *team = TASK_TEAM(task); - ucc_rank_t rank = task->subset.myrank; ucc_rank_t size = task->subset.map.ep_num; ucc_coll_type_t ct = args->coll_type; + ucc_rank_t root = (ct == UCC_COLL_TYPE_REDUCE) ? args->root : 0; + ucc_rank_t rank = VRANK(task->subset.myrank, root, size); size_t count = GET_COUNT(args); ucc_status_t status; @@ -381,7 +393,8 @@ ucc_status_t ucc_tl_ucp_reduce_scatter_knomial_start(ucc_coll_task_t *coll_task) 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); - if (ct == UCC_COLL_TYPE_ALLREDUCE) { + if ((ct == UCC_COLL_TYPE_ALLREDUCE) || + (ct == UCC_COLL_TYPE_REDUCE)) { ucc_kn_rsx_pattern_init(size, rank, task->reduce_scatter_kn.p.radix, count, &task->reduce_scatter_kn.p); } else { @@ -425,7 +438,8 @@ static size_t compute_scratch_size(ucc_tl_ucp_task_t *task) ucc_kn_radix_t step_radix; size_t max_recv_size; - if (args->coll_type == UCC_COLL_TYPE_ALLREDUCE) { + if ((args->coll_type == UCC_COLL_TYPE_ALLREDUCE) || + (args->coll_type == UCC_COLL_TYPE_REDUCE)) { if (KN_NODE_EXTRA != task->reduce_scatter_kn.p.node_type) { if (coll_args->mask & UCC_BASE_CARGS_MAX_FRAG_COUNT) { count = coll_args->max_frag_count; @@ -486,7 +500,9 @@ ucc_tl_ucp_reduce_scatter_knomial_init_r(ucc_base_coll_args_t *coll_args, task->super.progress = ucc_tl_ucp_reduce_scatter_knomial_progress; task->super.finalize = ucc_tl_ucp_reduce_scatter_knomial_finalize; - if (tl_team->cfg.use_reordering && ct == UCC_COLL_TYPE_ALLREDUCE) { + if (tl_team->cfg.use_reordering && + (ct == UCC_COLL_TYPE_ALLREDUCE)) { +//TODO: enable reordering for reduce sbgp = ucc_topo_get_sbgp(tl_team->topo, UCC_SBGP_FULL_HOST_ORDERED); task->subset.myrank = sbgp->group_rank; task->subset.map = sbgp->map; @@ -498,7 +514,9 @@ ucc_tl_ucp_reduce_scatter_knomial_init_r(ucc_base_coll_args_t *coll_args, if (ct == UCC_COLL_TYPE_ALLREDUCE) { ucc_kn_rsx_pattern_init(size, rank, radix, count, &task->reduce_scatter_kn.p); - + } else if (ct == UCC_COLL_TYPE_REDUCE) { + ucc_kn_rsx_pattern_init(size, VRANK(rank, coll_args->args.root, size), + radix, count, &task->reduce_scatter_kn.p); } else { ucc_kn_rs_pattern_init(size, rank, radix, count, &task->reduce_scatter_kn.p); diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 7db99bdaf2..8b0ca7ede0 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -153,6 +153,16 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, reduce_kn_radix), UCC_CONFIG_TYPE_UINT}, + {"REDUCE_SRG_KN_PIPELINE", "auto", + "Pipelining settings for SRG Knomial reduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, reduce_srg_kn_pipeline), + UCC_CONFIG_TYPE_PIPELINE_PARAMS}, + + {"REDUCE_SRG_KN_RADIX", "auto", + "Radix of the scatter-reduce-gather (SRG) knomial reduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, reduce_srg_kn_radix), + UCC_CONFIG_TYPE_UINT_RANGED}, + {"GATHER_KN_RADIX", "4", "Radix of the knomial tree reduce algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, gather_kn_radix), UCC_CONFIG_TYPE_UINT}, diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 894a33bc6f..861ef87073 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -58,6 +58,8 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t bcast_kn_radix; ucc_mrange_uint_t bcast_sag_kn_radix; uint32_t reduce_kn_radix; + ucc_pipeline_params_t reduce_srg_kn_pipeline; + ucc_mrange_uint_t reduce_srg_kn_radix; uint32_t gather_kn_radix; uint32_t gatherv_linear_num_posts; uint32_t scatter_kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 419b602156..8cef58de08 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -367,6 +367,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_REDUCE_ALG_DBT: *init = ucc_tl_ucp_reduce_dbt_init; break; + case UCC_TL_UCP_REDUCE_ALG_SRG: + *init = ucc_tl_ucp_reduce_srg_knomial_init; + break; default: status = UCC_ERR_INVALID_PARAM; break; diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 9668e46183..4218759f48 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -243,6 +243,7 @@ typedef struct ucc_tl_ucp_task { int phase; void * scratch; ucc_mc_buffer_header_t *scratch_mc_header; + ucc_knomial_pattern_t p; } gather_kn; struct { size_t merge_buf_size; @@ -320,17 +321,17 @@ static inline void ucc_tl_ucp_put_task(ucc_tl_ucp_task_t *task) } static inline ucc_status_t -ucc_tl_ucp_get_schedule(ucc_tl_ucp_team_t *team, - ucc_base_coll_args_t *args, +ucc_tl_ucp_get_schedule(ucc_tl_ucp_team_t *team, + ucc_base_coll_args_t *args, ucc_tl_ucp_schedule_t **schedule) { - ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); *schedule = ucc_mpool_get(&ctx->req_mp); - if (ucc_unlikely(!(*schedule))) { return UCC_ERR_NO_MEMORY; } + UCC_TL_UCP_PROFILE_REQUEST_NEW(schedule, "tl_ucp_sched", 0); return ucc_schedule_init(&((*schedule)->super.super), args, &team->super.super); @@ -342,7 +343,6 @@ static inline void ucc_tl_ucp_put_schedule(ucc_schedule_t *schedule) ucc_mpool_put(schedule); } - ucc_status_t ucc_tl_ucp_coll_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t * team, ucc_coll_task_t ** task_h);