Skip to content

Commit

Permalink
TEST: adds onesided alltoall test (#374) (#395)
Browse files Browse the repository at this point in the history
* TEST: add onesided alltoall tests

* TEST: initialize global work buffer

* CODESTYLE: fix code style

* REVIEW: address feedback

* REVIEW: address feedback

* API: require work buffer initialization

* TL/UCP: update onesided a2a to match API

* TEST: update test to match API

Co-authored-by: ferrol aderholdt <[email protected]>
Co-authored-by: valentin petrov <[email protected]>
(cherry picked from commit bbb0944)

Co-authored-by: ferrol aderholdt <[email protected]>
  • Loading branch information
wfaderhold21 and ferrol aderholdt authored Jan 21, 2022
1 parent 489f95f commit 71eee8e
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/components/tl/ucp/alltoall/alltoall_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask)
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);
long * pSync = TASK_ARGS(task).global_work_buffer;

if (*pSync < gsize - 1 || task->send_completed < task->send_posted) {
if (*pSync < gsize || task->send_completed < task->send_posted) {
ucp_worker_progress(UCC_TL_UCP_TEAM_CTX(team)->ucp_worker);
return UCC_INPROGRESS;
}

*pSync = -1;
pSync[0] = 0;
task->super.super.status = UCC_OK;
ucc_task_complete(ctask);
return task->super.super.status;
Expand Down
19 changes: 10 additions & 9 deletions src/ucc/api/ucc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1848,15 +1848,16 @@ typedef struct ucc_coll_args {
collectives */
ucc_error_type_t error_type; /*!< Error type */
ucc_coll_id_t tag; /*!< Used for ordering collectives */
void *global_work_buffer; /*!< User allocated scratchpad
buffer for one-sided
collectives. The buffer
provided should be at least
the size returned by @ref
ucc_context_get_attr with
the field mask -
UCC_CONTEXT_ATTR_FIELD_WORK_BUFFER_SIZE
set to 1. */
void *global_work_buffer; /*!< User allocated scratchpad
buffer for one-sided
collectives. The buffer
provided should be at least
the size returned by @ref
ucc_context_get_attr with
the field mask -
UCC_CONTEXT_ATTR_FIELD_WORK_BUFFER_SIZE
set to 1. The buffer must be initialized
to 0. */
ucc_coll_callback_t cb;
double timeout; /*!< Timeout in seconds */
} ucc_coll_args_t;
Expand Down
85 changes: 71 additions & 14 deletions test/gtest/common/test_ucc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ UccProcess::~UccProcess()
{
EXPECT_EQ(UCC_OK, ucc_context_destroy(ctx_h));
EXPECT_EQ(UCC_OK, ucc_finalize(lib_h));
if (ctx_params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS) {
for (auto i = 0; i < UCC_TEST_N_MEM_SEGMENTS; i++) {
ucc_free(onesided_buf[i]);
}
}
}

ucc_status_t UccTeam::allgather(void *src_buf, void *recv_buf, size_t size,
Expand Down Expand Up @@ -157,7 +162,8 @@ uint64_t rank_map_cb(uint64_t ep, void *cb_ctx) {
return (uint64_t)team->procs[(int)ep].p.get()->job_rank;
}

void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range)
void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range,
bool is_onesided)
{
ucc_team_params_t team_params;
std::vector<allgather_coll_info_t *> cis;
Expand Down Expand Up @@ -189,6 +195,10 @@ void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range)
team_params.oob.oob_ep = i;
team_params.mask |= UCC_TEAM_PARAM_FIELD_OOB;
}
if (is_onesided) {
team_params.mask |= UCC_TEAM_PARAM_FIELD_FLAGS;
team_params.flags = UCC_TEAM_FLAG_COLL_WORK_BUFFER;
}
EXPECT_EQ(UCC_OK,
ucc_team_create_post(&(procs[i].p.get()->ctx_h), 1, &team_params,
&(procs[i].team)));
Expand All @@ -211,7 +221,6 @@ void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range)
}
}


void UccTeam::destroy_team()
{
ucc_status_t status;
Expand Down Expand Up @@ -241,7 +250,7 @@ void UccTeam::progress()
}

UccTeam::UccTeam(std::vector<UccProcess_h> &_procs, bool use_team_ep_map,
bool use_ep_range)
bool use_ep_range, bool is_onesided)
{
n_procs = _procs.size();
ag.resize(n_procs);
Expand All @@ -252,7 +261,7 @@ UccTeam::UccTeam(std::vector<UccProcess_h> &_procs, bool use_team_ep_map,
a.phase = AG_INIT;
}
copy_complete_count = 0;
init_team(use_team_ep_map, use_ep_range);
init_team(use_team_ep_map, use_ep_range, is_onesided);
// test_allgather(128);
}

Expand Down Expand Up @@ -293,7 +302,6 @@ UccJob::UccJob(int _n_procs, ucc_job_ctx_mode_t _ctx_mode, ucc_job_env_t vars) :
/*restore original env */
setenv(v.first.c_str(), v.second.c_str(), 1);
}

}

void thread_allgather(void *src_buf, void *recv_buf, size_t size,
Expand Down Expand Up @@ -391,13 +399,61 @@ void proc_context_create(UccProcess_h proc, int id, ThreadAllgather *ta, bool is
throw std::runtime_error(err_msg.str());
}

void proc_context_create_mem_params(UccProcess_h proc, int id,
ThreadAllgather *ta)
{
ucc_status_t status;
ucc_context_config_h ctx_config;
std::stringstream err_msg;
ucc_mem_map_t map[UCC_TEST_N_MEM_SEGMENTS];

status = ucc_context_config_read(proc->lib_h, NULL, &ctx_config);
if (status != UCC_OK) {
err_msg << "ucc_context_config_read failed";
goto exit_err;
}
for (auto i = 0; i < UCC_TEST_N_MEM_SEGMENTS; i++) {
proc->onesided_buf[i] =
ucc_calloc(UCC_TEST_MEM_SEGMENT_SIZE, 1, "onesided_buffer");
EXPECT_NE(proc->onesided_buf[i], nullptr);
map[i].address = proc->onesided_buf[i];
map[i].len = UCC_TEST_MEM_SEGMENT_SIZE;
}
proc->ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB;
proc->ctx_params.mask |= UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS;
proc->ctx_params.oob.allgather = thread_allgather_start;
proc->ctx_params.oob.req_test = thread_allgather_req_test;
proc->ctx_params.oob.req_free = thread_allgather_req_free;
proc->ctx_params.oob.coll_info = (void *)&ta->reqs[id];
proc->ctx_params.oob.n_oob_eps = ta->n_procs;
proc->ctx_params.oob.oob_ep = id;
proc->ctx_params.mem_params.segments = map;
proc->ctx_params.mem_params.n_segments = UCC_TEST_N_MEM_SEGMENTS;
status = ucc_context_create(proc->lib_h, &proc->ctx_params, ctx_config,
&proc->ctx_h);
ucc_context_config_release(ctx_config);
if (status != UCC_OK) {
err_msg << "ucc_context_create for one-sided context failed";
goto exit_err;
}
return;

exit_err:
err_msg << ": " << ucc_status_string(status) << " (" << status << ")";
throw std::runtime_error(err_msg.str());
}

void UccJob::create_context()
{
std::vector<std::thread> workers;
for (auto i = 0; i < procs.size(); i++) {
workers.push_back(std::thread(proc_context_create, procs[i], i, &ta,
ctx_mode == UCC_JOB_CTX_GLOBAL));
if (ctx_mode == UCC_JOB_CTX_GLOBAL_ONESIDED) {
workers.push_back(
std::thread(proc_context_create_mem_params, procs[i], i, &ta));
} else {
workers.push_back(std::thread(proc_context_create, procs[i], i, &ta,
ctx_mode == UCC_JOB_CTX_GLOBAL));
}
}
for (auto i = 0; i < procs.size(); i++) {
workers[i].join();
Expand Down Expand Up @@ -464,28 +520,29 @@ void UccJob::cleanup()
}

UccTeam_h UccJob::create_team(int _n_procs, bool use_team_ep_map,
bool use_ep_range)
bool use_ep_range, bool is_onesided)
{
EXPECT_GE(n_procs, _n_procs);
std::vector<UccProcess_h> team_procs;
for (int i=0; i<_n_procs; i++) {
for (int i = 0; i < _n_procs; i++) {
team_procs.push_back(procs[i]);
}
return std::make_shared<UccTeam>(team_procs, use_team_ep_map, use_ep_range);
return std::make_shared<UccTeam>(team_procs, use_team_ep_map, use_ep_range,
is_onesided);
}

UccTeam_h UccJob::create_team(std::vector<int> &ranks, bool use_team_ep_map,
bool use_ep_range)
bool use_ep_range, bool is_onesided)
{
EXPECT_GE(n_procs, ranks.size());
std::vector<UccProcess_h> team_procs;
for (int i=0; i<ranks.size(); i++) {
for (int i = 0; i < ranks.size(); i++) {
team_procs.push_back(procs[ranks[i]]);
}
return std::make_shared<UccTeam>(team_procs, use_team_ep_map, use_ep_range);
return std::make_shared<UccTeam>(team_procs, use_team_ep_map, use_ep_range,
is_onesided);
}


UccReq::UccReq(UccTeam_h _team, ucc_coll_args_t *args) :
team(_team)
{
Expand Down
15 changes: 10 additions & 5 deletions test/gtest/common/test_ucc.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class UccProcess {
};
ucc_lib_h lib_h;
ucc_context_h ctx_h;
void * onesided_buf[3];
int job_rank;
UccProcess(int _job_rank,
const ucc_lib_params_t &lp = default_lib_params,
Expand Down Expand Up @@ -163,7 +164,7 @@ class UccTeam {
UccTeam *self;
} allgather_coll_info_t;
std::vector<struct allgather_data> ag;
void init_team(bool use_team_ep_map, bool use_ep_range);
void init_team(bool use_team_ep_map, bool use_ep_range, bool is_onesided);
void destroy_team();
void test_allgather(size_t msglen);
static ucc_status_t allgather(void *src_buf, void *recv_buf, size_t size,
Expand All @@ -176,7 +177,7 @@ class UccTeam {
void progress();
std::vector<proc> procs;
UccTeam(std::vector<UccProcess_h> &_procs, bool use_team_ep_map = false,
bool use_ep_range = true);
bool use_ep_range = true, bool is_onesided = false);
~UccTeam();
};
typedef std::shared_ptr<UccTeam> UccTeam_h;
Expand All @@ -191,7 +192,8 @@ class UccJob {
public:
typedef enum {
UCC_JOB_CTX_LOCAL,
UCC_JOB_CTX_GLOBAL /*< ucc ctx create with OOB */
UCC_JOB_CTX_GLOBAL, /*< ucc ctx create with OOB */
UCC_JOB_CTX_GLOBAL_ONESIDED
} ucc_job_ctx_mode_t;
static const int nStaticTeams = 3;
static const int staticUccJobSize = 16;
Expand All @@ -205,9 +207,9 @@ class UccJob {
~UccJob();
std::vector<UccProcess_h> procs;
UccTeam_h create_team(int n_procs, bool use_team_ep_map = false,
bool use_ep_range = true);
bool use_ep_range = true, bool is_onesided = false);
UccTeam_h create_team(std::vector<int> &ranks, bool use_team_ep_map = false,
bool use_ep_range = true);
bool use_ep_range = true, bool is_onesided = false);
void create_context();
ucc_job_ctx_mode_t ctx_mode;
};
Expand Down Expand Up @@ -241,4 +243,7 @@ void clear_buffer(void *_buf, size_t size, ucc_memory_type_t mt, uint8_t value);
UCC_DT_UINT8, UCC_DT_UINT16, UCC_DT_UINT32, UCC_DT_UINT64, UCC_DT_UINT128,\
UCC_DT_FLOAT16, UCC_DT_FLOAT32, UCC_DT_FLOAT64)

#define UCC_TEST_N_MEM_SEGMENTS 3
#define UCC_TEST_MEM_SEGMENT_SIZE (1 << 20)

#endif
Loading

0 comments on commit 71eee8e

Please sign in to comment.