-
Notifications
You must be signed in to change notification settings - Fork 729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dev refactor xccl primitive #10613
base: master
Are you sure you want to change the base?
Dev refactor xccl primitive #10613
Conversation
是否替换 AllToAll1. oneflow/core/job/collective_boxing/nccl_executor_backend.cu❌代码: for (int64_t j = 0; j < num_ranks; ++j) {
OF_NCCL_CHECK(ncclSend(reinterpret_cast<const void*>(
reinterpret_cast<const char*>(send_buff) + j * chunk_size),
elem_per_chunk, nccl_data_type, j, comm,
stream_ctx->stream()));
OF_NCCL_CHECK(ncclRecv(
reinterpret_cast<void*>(reinterpret_cast<char*>(recv_buff) + j * chunk_size),
elem_per_chunk, nccl_data_type, j, comm, stream_ctx->stream()));
} cuda中可以不替换,要求设备里也实现了 xccl_executor_backend.cu
2. oneflow/core/kernel/nccl_send_recv_boxing_kernel.cpp❓代码: OF_NCCL_CHECK(ncclGroupStart());
for (int64_t i = 0; i < parallel_num; ++i) {
if (this->has_input() && send_elem_cnts.at(i) != 0) {
OF_NCCL_CHECK(ncclSend(send_in_ptr.at(i), send_elem_cnts.at(i), GetNcclDataType(data_type), i,
comm, cuda_stream));
}
if (this->has_output() && recv_elem_cnts.at(i) != 0) {
OF_NCCL_CHECK(ncclRecv(recv_out_ptr.at(i), recv_elem_cnts.at(i), GetNcclDataType(data_type),
i, comm, cuda_stream));
}
}
OF_NCCL_CHECK(ncclGroupEnd()); 需要替换为AllToAll,不过有几个细节需要研究一下:
3. oneflow/user/kernels/eager_nccl_s2s_kernel.cu✅链接:oneflow/user/kernels/eager_nccl_s2s_kernel.cu 代码: OF_NCCL_CHECK(ncclGroupStart());
const int64_t elem_per_chunk = elem_cnt / num_ranks;
const int64_t chunk_size = elem_per_chunk * dtype_size;
for (int64_t j = 0; j < num_ranks; ++j) {
OF_NCCL_CHECK(ncclSend(reinterpret_cast<const void*>(
reinterpret_cast<const char*>(pack_to_ptr) + j * chunk_size),
elem_per_chunk, GetNcclDataType(in->data_type()), j,
kernel_cache->comm(),
ctx->stream()->As<ep::CudaStream>()->cuda_stream()));
OF_NCCL_CHECK(ncclRecv(
reinterpret_cast<void*>(reinterpret_cast<char*>(unpack_from_ptr) + j * chunk_size),
elem_per_chunk, GetNcclDataType(in->data_type()), j, kernel_cache->comm(),
ctx->stream()->As<ep::CudaStream>()->cuda_stream()));
}
OF_NCCL_CHECK(ncclGroupEnd()); 可以替换,chunk是均匀分配的 4. oneflow/user/kernels/nccl_logical_2d_sbp_kernels.cpp ✅代码: OF_NCCL_CHECK(ncclGroupStart());
const int64_t elem_per_chunk = elem_cnt / num_ranks;
const int64_t chunk_size = elem_per_chunk * dtype_size;
for (int64_t j = 0; j < num_ranks; ++j) {
OF_NCCL_CHECK(ncclSend(reinterpret_cast<const void*>(
reinterpret_cast<const char*>(pack_to_ptr) + j * chunk_size),
elem_per_chunk, GetNcclDataType(in->data_type()), j,
kernel_state->comm(),
ctx->stream()->As<ep::CudaStream>()->cuda_stream()));
OF_NCCL_CHECK(ncclRecv(
reinterpret_cast<void*>(reinterpret_cast<char*>(unpack_from_ptr) + j * chunk_size),
elem_per_chunk, GetNcclDataType(in->data_type()), j, kernel_state->comm(),
ctx->stream()->As<ep::CudaStream>()->cuda_stream()));
}
OF_NCCL_CHECK(ncclGroupEnd()); 可以替换,chunk是均匀分配的 5. oneflow/user/kernels/nccl_logical_fusion_kernel.cpp ✅这个文件里有两处:
可以替换 6. oneflow/user/kernels/nccl_logical_kernels.cpp ✅可以替换 7. oneflow/user/kernels/nccl_logical_send_recv_kernel.cpp ❓代码: for (int64_t i = 0; i < parallel_num; ++i) {
if (send_elem_cnts.at(i) != 0) {
LOG(INFO) << parallel_id << " send " << send_elem_cnts.at(i) << " to " << i;
OF_NCCL_CHECK(ncclSend(send_in_ptr.at(i), send_elem_cnts.at(i), GetNcclDataType(data_type), i,
comm, cuda_stream));
}
if (recv_elem_cnts.at(i) != 0) {
LOG(INFO) << parallel_id << " recv " << recv_elem_cnts.at(i) << " from " << i;
OF_NCCL_CHECK(ncclRecv(recv_out_ptr.at(i), recv_elem_cnts.at(i), GetNcclDataType(data_type),
i, comm, cuda_stream));
}
} 需要考虑一下 8. oneflow/user/kernels/one_embedding_data_shuffle.cuh❌ for (int64_t i = 0; i < parallel_num; ++i) {
OF_NCCL_CHECK(ncclSend(send_data + send_offsets.at(i), send_elem_cnt.at(i), nccl_data_type, i,
comm, cuda_stream));
OF_NCCL_CHECK(ncclRecv(recv_data + recv_offsets.at(i), recv_elem_cnt.at(i), nccl_data_type, i,
comm, cuda_stream));
} 先不替换,先不支持 one_embedding |
…oneflow into dev_refactor_xccl_primitive
oneflow/user/kernels/collective_communication/cuda/cuda_all_to_all.cpp
Outdated
Show resolved
Hide resolved
virtual ccl::CclComm GetCclCommForDevice( | ||
const std::set<std::pair<int64_t, int64_t>>& device_set) { | ||
ccl::CclComm ccl_comm{}; | ||
return ccl_comm; | ||
} | ||
virtual ccl::CclComm GetCclCommForDeviceAndStreamName( | ||
const std::set<std::pair<int64_t, int64_t>>& device_set, const std::string& stream_name) { | ||
ccl::CclComm ccl_comm{}; | ||
return ccl_comm; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这两个行为并不适合放在抽象类中,对于cpu相关的实现就不适用
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯,那这块定义成纯虚的怎样?cpu的目前看也没有子类的CommMgr实现(后续如果cpu需要实现,在子类的方法中直接UNIMPLEMENTED()感觉也行?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
device_set也是从paralledesc来的,这里接口的形式可以改变一下,输入是paralledesc,输出是CommunicationContext
// abstruct base class for comm | ||
class CommBase { | ||
public: | ||
virtual ~CommBase() = default; | ||
|
||
// return impl of comm | ||
virtual void* getComm() = 0; | ||
}; | ||
|
||
class CclComm { | ||
public: | ||
CclComm() {} | ||
explicit CclComm(std::shared_ptr<CommBase> comm) : comm_(std::move(comm)) {} | ||
|
||
void* getComm() { return comm_->getComm(); } | ||
|
||
private: | ||
std::shared_ptr<CommBase> comm_{}; | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个东西是不是和 CommunicationContext 的作用重复了,没有必要再搞一套
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉还是有点区别,CommunicationContext我理解是根据ParallelDesc,Init对应的的comm实现;CclComm则是直接传入已创建好的comm对象,包了一层,提供一个统一的getComm方法
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉还是有点区别,CommunicationContext我理解是根据ParallelDesc,Init对应的的comm实现;CclComm则是直接传入已创建好的comm对象,包了一层,提供一个统一的getComm方法
创建好的comm对象
,这个东西是不是也是根据ParallelDesc创建出来的呀,本质还是一个东西吧。
virtual void Launch(ep::Stream* stream, void* out, size_t elem_cnt, int64_t src) const = 0; | ||
|
||
virtual void Launch(ep::Stream* stream, void* out, size_t elem_cnt, int64_t src, | ||
CclComm ccl_comm) const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同样,launch接口没有必要定义两套
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
主要是不太好改动之前的那套😂,所以新加了一个(因为之前的comm比较隐晦,像这里的cuda的recv实现,comm是通过const auto& comm_and_peer_rank = GetNcclCommAndPeerNcclRank(src);
拿到的;新加的这个提供一个直接的CclComm,感觉上比较直观一点
View latest API docs preview at: https://oneflow-staging.oss-cn-beijing.aliyuncs.com/docs/Oneflow-Inc/oneflow/pr/10613/ |
oneflow/user/kernels/collective_communication/include
目录下)并提供子类实现