From ffabacb8da7bd3a3ddb0701da1929a93cadad592 Mon Sep 17 00:00:00 2001 From: qijun Date: Tue, 27 Oct 2020 17:15:18 +0800 Subject: [PATCH 1/5] init --- doc/allreduce.md | 113 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 doc/allreduce.md diff --git a/doc/allreduce.md b/doc/allreduce.md new file mode 100644 index 00000000..a77c3eaa --- /dev/null +++ b/doc/allreduce.md @@ -0,0 +1,113 @@ +# AllReduce + + +## Introduction + +Data parallelism enables distributed training by communicating gradients before the optimizer step to make sure that parameters of all model replicas are updated using exactly the same set of gradients, and hence model replicas can stay consistent across iterations. + +AllReduce is a common strategy to communicating gradients in data parallelism. Following is the persudo code describing the training procedures under AllReduce strategy. + +```python +broadcast(parameters, rank=0) +while True: + load_minibatch() + forward() + backward() + allreduce(gradients) + update() +``` + +First, we broadcast model parameters of rank 0 to other processes. Then, each process loads a minibatch of training data, does forward/backward computation, and gets the gradients. We launch AllReduce to communicate gradients among the processes. At last, we update parameters in each process individually. + +## AllReduce in PyTorch + +Before discussing how to support AllReduce in GoTorch, it's necessary to have a thorough suvery on current implementation of AllReduce in PyTorch. + +PyTorch offers several tools to facilitate distributed training, including [DataParallel](https://pytorch.org/docs/stable/generated/torch.nn.DataParallel.html#torch.nn.DataParallel) for single-process multi-thread data parallel training using multiple GPUs on the same machine, [DistributedDataParallel](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel) for multi-process data parallel training across GPUs and machines. + +Single-process multi-GPU is not the recommended mode now, becase of its overhead of scatter/gather and GIL contention in every forward pass. So, let's focus on DistributedDataParallel. + + +### Collective Communication Library + +PyTorch could use different collective communication libraries as the backend, including [NCCL](https://developer.nvidia.com/nccl) and [Gloo](https://github.com/facebookincubator/gloo). NCCL supports GPU, while Gloo supports both CPU and GPU. The performance on GPU of NCCL is better than Gloo. So we use NCCL in GPU training, and Gloo in CPU training. + +Besides, PyTorch provides a library, [c10d](https://github.com/pytorch/pytorch/tree/master/torch/lib/c10d), which wrappers NCCL/Gloo, to manipulate `torch::Tensor` directly. It brings much convenience. + +The following is an example: + +```cpp +#include +#include + +using namespace ::c10d; + +int main(int argc, char** argv) { + int rank = atoi(getenv("RANK")); + int size = atoi(getenv("SIZE")); + auto store = std::make_shared("/tmp/c10d_example", size); + ProcessGroupGloo pg(store, rank, size); + + // Create some tensors + const auto ntensors = 10; + std::vector tensors; + for (auto i = 0; i < ntensors; i++) { + auto x = + at::ones({1000, 16 * (i + 1)}, at::TensorOptions(at::CPU(at::kFloat))); + tensors.push_back(x); + } + + // Kick off work + std::vector> pending; + for (auto i = 0; i < ntensors; i++) { + std::vector tmp = {tensors[i]}; + pending.push_back(pg.allreduce(tmp)); + } + + // Wait for work to complete + for (auto& work : pending) { + work->wait(); + } +} +``` + +### DistributedSampler + +The training samples are partitioned statically in distributed training of PyTorch. The [DistributedSampler](https://pytorch.org/docs/stable/_modules/torch/utils/data/distributed.html#DistributedSampler) generates a sequence of indices of training samples for each training process. Then, each training process load subset samples by the indices. + +### Launch Utility + +The `torch.distributed` package provides a launch utility in [torch.distributed.launch](https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py). This helper utility can be used to launch multiple processes per node for distributed training. If the utility is used for GPU training, each distributed process will be operating on a single GPU. + +### Optimization + +The naive implementation of training procedures in [Introduction](#Introduction) section has two performance concerns: + +- Collective communication performs poorly on small tensors, which will be especially prominent on large models with massive numbers of small parameters. + +- Separating gradient computation and synchronization forfeits the opportunity to overlap computation with communication due to the hard boundary in between. + +PyTorch does more optimizations to solve these two problems: + +- bucket gradients to reduce AllReduce kernels overhead. +- register AllReduce kernels as autograd hooks to overlap communication and computation. + +For more details, please refer to the [paper](https://arxiv.org/abs/2006.15704). + + +## AllReduce in GoTorch + +### Stage 1 + + + + + +### Stage 2 + +## Reference + +- https://pytorch.org/docs +- https://arxiv.org/abs/2006.15704 + + From 9949a548a61b405a0b2098e37a03aaa38afc7ed3 Mon Sep 17 00:00:00 2001 From: qijun Date: Tue, 27 Oct 2020 17:38:52 +0800 Subject: [PATCH 2/5] add gotorch part --- doc/allreduce.md | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/doc/allreduce.md b/doc/allreduce.md index a77c3eaa..7cd0d764 100644 --- a/doc/allreduce.md +++ b/doc/allreduce.md @@ -1,6 +1,5 @@ # AllReduce - ## Introduction Data parallelism enables distributed training by communicating gradients before the optimizer step to make sure that parameters of all model replicas are updated using exactly the same set of gradients, and hence model replicas can stay consistent across iterations. @@ -27,7 +26,6 @@ PyTorch offers several tools to facilitate distributed training, including [Data Single-process multi-GPU is not the recommended mode now, becase of its overhead of scatter/gather and GIL contention in every forward pass. So, let's focus on DistributedDataParallel. - ### Collective Communication Library PyTorch could use different collective communication libraries as the backend, including [NCCL](https://developer.nvidia.com/nccl) and [Gloo](https://github.com/facebookincubator/gloo). NCCL supports GPU, while Gloo supports both CPU and GPU. The performance on GPU of NCCL is better than Gloo. So we use NCCL in GPU training, and Gloo in CPU training. @@ -75,6 +73,8 @@ int main(int argc, char** argv) { The training samples are partitioned statically in distributed training of PyTorch. The [DistributedSampler](https://pytorch.org/docs/stable/_modules/torch/utils/data/distributed.html#DistributedSampler) generates a sequence of indices of training samples for each training process. Then, each training process load subset samples by the indices. +**Note:** The dataset is assumed to be of constant size. + ### Launch Utility The `torch.distributed` package provides a launch utility in [torch.distributed.launch](https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py). This helper utility can be used to launch multiple processes per node for distributed training. If the utility is used for GPU training, each distributed process will be operating on a single GPU. @@ -89,21 +89,31 @@ The naive implementation of training procedures in [Introduction](#Introduction) PyTorch does more optimizations to solve these two problems: -- bucket gradients to reduce AllReduce kernels overhead. -- register AllReduce kernels as autograd hooks to overlap communication and computation. +- Bucketing gradients to reduce AllReduce kernels overhead. +- Registering AllReduce kernels as autograd hooks to overlap communication and computation. For more details, please refer to the [paper](https://arxiv.org/abs/2006.15704). ## AllReduce in GoTorch -### Stage 1 +We plan to implement the functionalities of DistributedDataParallel gradually in GoTorch. At stage 1, we provide a naive solution. A MNIST distributed example is the target in this stage. At stage 2, we will provide a optimized solution. Bucketing gradients and registering hooks will be implemented at this stage. + +### RecordIODataLoader + + + +### Go wrapper of c10d Library +[ProcessGroupNCCL](https://github.com/pytorch/pytorch/blob/master/torch/lib/c10d/ProcessGroupNCCL.hpp) implements NCCL bindings for c10d library. After adding a Go wrapper of this class, we could doing allreduce on torch tensors in Go. +### Go Launch Utility +Go provides [os/exec](https://golang.org/pkg/os/exec/) library to spawn processes. +### Optimization at Stage 2 -### Stage 2 +TBD ## Reference From 59f653bc73ad322d45820caae2e054cea81bd9f6 Mon Sep 17 00:00:00 2001 From: qijun Date: Wed, 28 Oct 2020 11:28:37 +0800 Subject: [PATCH 3/5] add RecordIODataLoader part --- doc/allreduce.md | 102 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 78 insertions(+), 24 deletions(-) diff --git a/doc/allreduce.md b/doc/allreduce.md index 7cd0d764..7b183cab 100644 --- a/doc/allreduce.md +++ b/doc/allreduce.md @@ -2,9 +2,14 @@ ## Introduction -Data parallelism enables distributed training by communicating gradients before the optimizer step to make sure that parameters of all model replicas are updated using exactly the same set of gradients, and hence model replicas can stay consistent across iterations. +Data parallelism enables distributed training by communicating gradients +before the optimizer step to make sure that parameters of all model replicas +are updated using exactly the same set of gradients, +and hence model replicas can stay consistent across iterations. -AllReduce is a common strategy to communicating gradients in data parallelism. Following is the persudo code describing the training procedures under AllReduce strategy. +AllReduce is a common strategy for communicating gradients in data parallelism. +Following is the pseudocode describing the training procedures +under AllReduce strategy. ```python broadcast(parameters, rank=0) @@ -16,23 +21,43 @@ while True: update() ``` -First, we broadcast model parameters of rank 0 to other processes. Then, each process loads a minibatch of training data, does forward/backward computation, and gets the gradients. We launch AllReduce to communicate gradients among the processes. At last, we update parameters in each process individually. +First, we broadcast model parameters of rank 0 to other processes. +Each process loads a minibatch of training data, +does forward/backward computation, and gets the gradients. +We launch AllReduce to communicate gradients among the processes. +At last, we update the parameters in each process individually. ## AllReduce in PyTorch -Before discussing how to support AllReduce in GoTorch, it's necessary to have a thorough suvery on current implementation of AllReduce in PyTorch. +Before discussing how to support AllReduce in GoTorch, +it's necessary to have a thorough suvey on +the current implementation of AllReduce in PyTorch. -PyTorch offers several tools to facilitate distributed training, including [DataParallel](https://pytorch.org/docs/stable/generated/torch.nn.DataParallel.html#torch.nn.DataParallel) for single-process multi-thread data parallel training using multiple GPUs on the same machine, [DistributedDataParallel](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel) for multi-process data parallel training across GPUs and machines. +PyTorch offers several tools to facilitate distributed training, +including [DataParallel](https://pytorch.org/docs/stable/generated/torch.nn.DataParallel.html#torch.nn.DataParallel) +for single-process multi-thread data parallel training +using multiple GPUs on the same machine, +[DistributedDataParallel](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel) +for multi-process data parallel training +across GPUs and machines. -Single-process multi-GPU is not the recommended mode now, becase of its overhead of scatter/gather and GIL contention in every forward pass. So, let's focus on DistributedDataParallel. +Single-process multi-GPU is not the recommended mode, +becase of its overhead of scatter/gather and GIL contention in every forward pass. +So, let's focus on DistributedDataParallel. ### Collective Communication Library -PyTorch could use different collective communication libraries as the backend, including [NCCL](https://developer.nvidia.com/nccl) and [Gloo](https://github.com/facebookincubator/gloo). NCCL supports GPU, while Gloo supports both CPU and GPU. The performance on GPU of NCCL is better than Gloo. So we use NCCL in GPU training, and Gloo in CPU training. +PyTorch could use different collective communication libraries as the backend, +including [NCCL](https://developer.nvidia.com/nccl) and [Gloo](https://github.com/facebookincubator/gloo). +NCCL supports GPU, while Gloo supports both CPU and GPU. +The performance on GPU of NCCL is better than Gloo. +So we use NCCL in GPU training, and Gloo in CPU training. -Besides, PyTorch provides a library, [c10d](https://github.com/pytorch/pytorch/tree/master/torch/lib/c10d), which wrappers NCCL/Gloo, to manipulate `torch::Tensor` directly. It brings much convenience. +Besides, PyTorch provides a library, [c10d](https://github.com/pytorch/pytorch/tree/master/torch/lib/c10d), +which wrappers NCCL/Gloo, to manipulate `torch::Tensor` directly. +It brings much convenience. -The following is an example: +Following is an example: ```cpp #include @@ -71,41 +96,72 @@ int main(int argc, char** argv) { ### DistributedSampler -The training samples are partitioned statically in distributed training of PyTorch. The [DistributedSampler](https://pytorch.org/docs/stable/_modules/torch/utils/data/distributed.html#DistributedSampler) generates a sequence of indices of training samples for each training process. Then, each training process load subset samples by the indices. +The training samples are partitioned statically in distributed training of PyTorch. +The [DistributedSampler](https://pytorch.org/docs/stable/_modules/torch/utils/data/distributed.html#DistributedSampler) +generates a sequence of indices of training samples for each training process. +Then, each process loads a subset samples by the indices. **Note:** The dataset is assumed to be of constant size. ### Launch Utility -The `torch.distributed` package provides a launch utility in [torch.distributed.launch](https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py). This helper utility can be used to launch multiple processes per node for distributed training. If the utility is used for GPU training, each distributed process will be operating on a single GPU. +The `torch.distributed` package provides a launch utility in +[torch.distributed.launch](https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py). +This helper utility can be used to +launch multiple processes per node for distributed training. +If the utility is used for GPU training, +each distributed process will be operating on a single GPU. ### Optimization -The naive implementation of training procedures in [Introduction](#Introduction) section has two performance concerns: +The naive implementation of training procedures +in [Introduction](#Introduction) section has two performance concerns: -- Collective communication performs poorly on small tensors, which will be especially prominent on large models with massive numbers of small parameters. - -- Separating gradient computation and synchronization forfeits the opportunity to overlap computation with communication due to the hard boundary in between. +- Collective communication performs poorly on +small tensors, which will be especially prominent on large models +with massive numbers of small parameters. +- Separating gradient computation and synchronization forfeits the opportunity +to overlap computation with communication due to the hard boundary in between. PyTorch does more optimizations to solve these two problems: - Bucketing gradients to reduce AllReduce kernels overhead. -- Registering AllReduce kernels as autograd hooks to overlap communication and computation. +- Registering AllReduce kernels as autograd hooks +to overlap communication and computation. For more details, please refer to the [paper](https://arxiv.org/abs/2006.15704). - ## AllReduce in GoTorch -We plan to implement the functionalities of DistributedDataParallel gradually in GoTorch. At stage 1, we provide a naive solution. A MNIST distributed example is the target in this stage. At stage 2, we will provide a optimized solution. Bucketing gradients and registering hooks will be implemented at this stage. +We plan to implement the functionalities of +DistributedDataParallel gradually in GoTorch. +At stage 1, we provide a naive solution. +An MNIST distributed example is the target in this stage. +At stage 2, we will provide an optimized solution. +Bucketing gradients and registering hooks will be implemented at this stage. ### RecordIODataLoader +The RecordIO format is a simple format for a sequence of binary records. +It provides a way to seek the beginning of any record in a file. +We could partition the data in RecordIO format into training processes. +At stage 1, we support static sharding only. +Following are the steps of static sharding in distributed training: + +1. Convert samples into RecordIO format. +1. Partition records into several tasks. Each task contains +a sequence of `{file, start_idx, end_idx}` structs. +1. Shuffle tasks and assign a subset of tasks to a training process. -### Go wrapper of c10d Library +1. Decode records in tasks and feed to the neural network. -[ProcessGroupNCCL](https://github.com/pytorch/pytorch/blob/master/torch/lib/c10d/ProcessGroupNCCL.hpp) implements NCCL bindings for c10d library. After adding a Go wrapper of this class, we could doing allreduce on torch tensors in Go. +### Go Wrapper of c10d Library + +[ProcessGroupNCCL](https://github.com/pytorch/pytorch/blob/master/torch/lib/c10d/ProcessGroupNCCL.hpp) +implements NCCL bindings for c10d library. +After adding a Go wrapper of this class, +we could do allreduce on torch tensors in Go. ### Go Launch Utility @@ -117,7 +173,5 @@ TBD ## Reference -- https://pytorch.org/docs -- https://arxiv.org/abs/2006.15704 - - +- +- From cd6a379fece732956333fb976e63d0bd4dd21390 Mon Sep 17 00:00:00 2001 From: qijun Date: Wed, 28 Oct 2020 12:08:01 +0800 Subject: [PATCH 4/5] fix ci --- doc/allreduce.md | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/doc/allreduce.md b/doc/allreduce.md index 7b183cab..a95c5494 100644 --- a/doc/allreduce.md +++ b/doc/allreduce.md @@ -118,16 +118,16 @@ The naive implementation of training procedures in [Introduction](#Introduction) section has two performance concerns: - Collective communication performs poorly on -small tensors, which will be especially prominent on large models -with massive numbers of small parameters. + small tensors, which will be especially prominent on large models + with massive numbers of small parameters. - Separating gradient computation and synchronization forfeits the opportunity -to overlap computation with communication due to the hard boundary in between. + to overlap computation with communication due to the hard boundary in between. PyTorch does more optimizations to solve these two problems: - Bucketing gradients to reduce AllReduce kernels overhead. - Registering AllReduce kernels as autograd hooks -to overlap communication and computation. + to overlap communication and computation. For more details, please refer to the [paper](https://arxiv.org/abs/2006.15704). @@ -149,11 +149,9 @@ At stage 1, we support static sharding only. Following are the steps of static sharding in distributed training: 1. Convert samples into RecordIO format. - 1. Partition records into several tasks. Each task contains -a sequence of `{file, start_idx, end_idx}` structs. + a sequence of `{file, start_idx, end_idx}` structs. 1. Shuffle tasks and assign a subset of tasks to a training process. - 1. Decode records in tasks and feed to the neural network. ### Go Wrapper of c10d Library From 89bee36f5faa717805c1db909644b399f37fc23b Mon Sep 17 00:00:00 2001 From: qijun Date: Wed, 28 Oct 2020 14:30:43 +0800 Subject: [PATCH 5/5] polish doc --- doc/allreduce.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/allreduce.md b/doc/allreduce.md index a95c5494..9c6baf6b 100644 --- a/doc/allreduce.md +++ b/doc/allreduce.md @@ -144,13 +144,13 @@ Bucketing gradients and registering hooks will be implemented at this stage. The RecordIO format is a simple format for a sequence of binary records. It provides a way to seek the beginning of any record in a file. -We could partition the data in RecordIO format into training processes. +We could partition the RecordIO data and assgin to training processes. At stage 1, we support static sharding only. Following are the steps of static sharding in distributed training: 1. Convert samples into RecordIO format. 1. Partition records into several tasks. Each task contains - a sequence of `{file, start_idx, end_idx}` structs. + one or more `{file, start_idx, end_idx}` tuples. 1. Shuffle tasks and assign a subset of tasks to a training process. 1. Decode records in tasks and feed to the neural network.