Skip to content

Commit

Permalink
Merge branch 'master' into arashb/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
loadams authored Dec 11, 2023
2 parents f4a340a + b186816 commit ee7fc0f
Show file tree
Hide file tree
Showing 21 changed files with 227 additions and 105 deletions.
4 changes: 2 additions & 2 deletions accelerator/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def random(self):
return torch.random

def set_rng_state(self, new_state, device_index=None):
if device_index == None:
if device_index is None:
return torch.set_rng_state(new_state)
return torch.set_rng_state(new_state, device_index)

Expand Down Expand Up @@ -253,7 +253,7 @@ def on_accelerator(self, tensor):
# create an instance of op builder and return, name specified by class_name
def create_op_builder(self, op_name):
builder_class = self.get_op_builder(op_name)
if builder_class != None:
if builder_class is not None:
return builder_class()
return None

Expand Down
4 changes: 2 additions & 2 deletions accelerator/cuda_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def is_synchronized_device(self):

# Device APIs
def device_name(self, device_index=None):
if device_index == None:
if device_index is None:
return 'cuda'
return 'cuda:{}'.format(device_index)

Expand Down Expand Up @@ -280,7 +280,7 @@ def op_builder_dir(self):
class_dict = None

def _lazy_init_class_dict(self):
if self.class_dict != None:
if self.class_dict is not None:
return
else:
self.class_dict = {}
Expand Down
4 changes: 2 additions & 2 deletions accelerator/mps_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def is_synchronized_device(self):

# Device APIs
def device_name(self, device_index=None):
if device_index == None:
if device_index is None:
return "mps"
return "mps:{}".format(device_index)

Expand Down Expand Up @@ -221,7 +221,7 @@ def op_builder_dir(self):
# create an instance of op builder, specified by class_name
def create_op_builder(self, op_name):
builder_class = self.get_op_builder(op_name)
if builder_class != None:
if builder_class is not None:
return builder_class()
return None

Expand Down
2 changes: 1 addition & 1 deletion accelerator/npu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def is_synchronized_device(self):

# Device APIs
def device_name(self, device_index=None):
if device_index == None:
if device_index is None:
return 'npu'
return 'npu:{}'.format(device_index)

Expand Down
2 changes: 1 addition & 1 deletion accelerator/real_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _validate_accelerator(accel_obj):


def is_current_accelerator_supported():
return get_accelerator() in SUPPORTED_ACCELERATOR_LIST
return get_accelerator().device_name() in SUPPORTED_ACCELERATOR_LIST


def get_accelerator():
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/inference/quantization/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(self, config: Dict, pre_quant_layer: nn.Embedding) -> None:
device=pre_quant_layer.weight.device,
dtype=pre_quant_layer.weight.dtype)

assert pre_quant_layer.max_norm == None, 'Not supported'
assert pre_quant_layer.max_norm is None, 'Not supported'
assert pre_quant_layer.norm_type == 2, 'Not supported'
assert pre_quant_layer.scale_grad_by_freq == False, 'Not supported'
assert pre_quant_layer.sparse == False, 'Not supported'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def supports_config(config: DSEmbeddingsConfig) -> bool:
if config.use_token_type:
return False

if config.output_normalization != None:
if config.output_normalization is not None:
return False

try:
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/module_inject/auto_tp.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ def get_model_num_kv_heads(self, config):
for name in kv_head_names:
if hasattr(config, name):
num_kv_heads = getattr(config, name)
if num_kv_heads != None:
if num_kv_heads is not None:
break
return num_kv_heads

Expand Down
2 changes: 1 addition & 1 deletion deepspeed/module_inject/fusedqkv_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def require_tp_fused_qkvw(name, mp_size):


def prepare_tp_fused_qkvw(module_str, src, mp_size, gpu_index):
if src == None:
if src is None:
return
fused_type_dict = {
'CodeGenBlock': 'codegentype',
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/module_inject/replace_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def skip_level_0_prefix(model, state_dict):
if key is None:
key = re.match(r"(.*?)Model", model)
# if keys start with 'model.', don't skip level 0 prefix
if state_dict != None:
if state_dict is not None:
for item in state_dict.keys():
if re.match("^model[.]", item):
return False
Expand Down
4 changes: 2 additions & 2 deletions deepspeed/module_inject/tp_shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def get_num_kv_heads():
def get_shard_size(total_size, mp_size, rank=None):
global num_kv_heads
# When we have num_kv_heads defined, uneven division is possible, otherwise enforce even division
if num_kv_heads != None:
if (rank == None):
if num_kv_heads is not None:
if rank is None:
rank = dist.get_rank()
my_slices = (num_kv_heads // mp_size) + (1 if rank < (num_kv_heads % mp_size) else 0)
return total_size * my_slices // num_kv_heads
Expand Down
124 changes: 124 additions & 0 deletions deepspeed/runtime/comm/hccl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import numpy as np
import torch
import torch_npu
import deepspeed.comm as dist


class HcclBackend(object):

def __init__(self, mpu=None):
if mpu is None:
self.world_group = dist.new_group(ranks=range(dist.get_world_size()))
else:
self.mpu = mpu
self.world_group = self.mpu.get_data_parallel_group()
self.size = dist.get_world_size(group=self.world_group)
self.rank = dist.get_rank(group=self.world_group)

def my_igather(self, rank, size, group, sendbuf, recvbuf, root):
req = []
if rank == root:
for idx in range(size):
if idx != rank:
req.append(dist.irecv(recvbuf[idx], src=idx, group=group))
else:
recvbuf[rank] = sendbuf
else:
req.append(dist.isend(sendbuf, group=group, dst=root))
return req

def my_gather(self, rank, size, group, sendbuf, recvbuf, root):
if rank == root:
for idx in range(size):
if idx != rank:
dist.recv(recvbuf[idx], src=idx, group=group)
else:
recvbuf[rank] = sendbuf
else:
dist.send(sendbuf, group=group, dst=root)

def compressed_allreduce(self, buffer_m: torch.tensor, worker_error, server_error, local_rank):
original_shape = buffer_m.size()
if len(original_shape) > 1:
buffer_m = torch.flatten(buffer_m)

# align size of original_buffer and error
original_size = buffer_m.numel()
worker_error_size = worker_error.numel()
if original_size != worker_error_size:
empty_tensor = torch.zeros(worker_error_size - original_size, device=buffer_m.device)
buffer_m = torch.cat([buffer_m, empty_tensor])

buffer_m.add_(worker_error)
worker_scale = torch.linalg.norm(buffer_m) / np.sqrt(torch.numel(buffer_m))

worker_error.set_(buffer_m - worker_scale * buffer_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0))

sign_list_packed_tmp = torch_npu.npu_sign_bits_pack(buffer_m, self.size).type(torch.int8)

recvbuf_sign = torch.zeros([self.size, len(sign_list_packed_tmp[self.rank])],
dtype=sign_list_packed_tmp[0].dtype,
device=sign_list_packed_tmp.device)

sign_list_packed = [sign_list_packed_tmp[idx] for idx in range(self.size)]

recvbuf_scale = [
torch.zeros(1, dtype=worker_scale.dtype, device=torch.device(local_rank)) for _ in range(self.size)
]

# communication phase 1
# all to all for sign
dist.all_to_all_single(recvbuf_sign, torch.stack(sign_list_packed), group=self.world_group)
# all gather for scale
dist.all_gather(recvbuf_scale, worker_scale, group=self.world_group)

flattened_recvbuf_sign = recvbuf_sign.type(torch.uint8).flatten()
compensated_server_m = torch_npu.npu_sign_bits_unpack(flattened_recvbuf_sign, self.size, torch.float32) \
.mul_(torch.stack(recvbuf_scale).mul_(1 / self.size)).sum(0)

compensated_server_m.add_(server_error)

server_scale = torch.norm(compensated_server_m) / np.sqrt(compensated_server_m.numel())

server_error.set_(compensated_server_m -
server_scale * compensated_server_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0))

server_sign_packed = torch_npu.npu_sign_bits_pack(compensated_server_m, 1).type(torch.int8)

# recvbuf_sign_server
recvbuf_sign_server_tmp = torch.zeros([self.size, len(server_sign_packed[0])],
dtype=recvbuf_sign.dtype,
device=server_sign_packed.device)

recvbuf_sign_server = [recvbuf_sign_server_tmp[idx] for idx in range(self.size)]

# recvbuf_scale_server
recvbuf_scale_server_tmp = torch.zeros([self.size, 1],
dtype=worker_scale.dtype,
device=server_sign_packed.device)

recvbuf_scale_server = [recvbuf_scale_server_tmp[idx] for idx in range(self.size)]

# communication Phase 2
dist.all_gather(recvbuf_sign_server, server_sign_packed[0], group=self.world_group)
dist.all_gather(recvbuf_scale_server, server_scale, group=self.world_group)

recvbuf_sign_server = torch.stack(recvbuf_sign_server)

flattened_recvbuf_sign_server = recvbuf_sign_server.type(torch.uint8).flatten()

buffer_m.data.copy_(
torch_npu.npu_sign_bits_unpack(flattened_recvbuf_sign_server, self.size,
torch.float32).mul_(recvbuf_scale_server_tmp).flatten().data)

if original_size != worker_error_size:
buffer_m = buffer_m[0:original_size]
if len(original_shape) > 1:
buffer_m = buffer_m.reshape(original_shape)

return buffer_m
11 changes: 5 additions & 6 deletions deepspeed/runtime/fp16/onebit/adam.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ def __init__(self,

super(OnebitAdam, self).__init__(params, defaults)
self.eps_mode = 0 if eps_inside_sqrt else 1
assert (dist.is_initialized())

self.comm_time = 0.0
self.step_time = 0.0
self.ave_step = 1
Expand All @@ -86,22 +84,23 @@ def __init__(self,

self.comm_backend_name = comm_backend_name

assert dist.is_initialized(), "Please initialize the torch distributed backend."
# Empty initializer. Set handle based on the comm backend as follows.
self.comm_backend_handle = None

if self.comm_backend_name == 'nccl':
assert (
required_torch_version(min_version=1.8)
), "Please use torch 1.8 or greater to enable NCCL backend in 1-bit Adam. Alternatively, please specify 'mpi' as the 'comm_backend_name' in config file to proceed with the MPI backend"
assert dist.is_initialized() == True, "Please initialize the torch distributed backend."
from deepspeed.runtime.comm.nccl import NcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = NcclBackend(self.deepspeed.mpu)

elif self.comm_backend_name == 'mpi':
from deepspeed.runtime.comm.mpi import MpiBackend
self.comm_backend_handle = MpiBackend(cuda_aware)

elif self.comm_backend_name == 'hccl':
from deepspeed.runtime.comm.hccl import HcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = HcclBackend(self.deepspeed.mpu)
self.size = self.comm_backend_handle.size

self.divider = int(self.size * 8 / np.gcd(self.size, 8))
Expand Down
12 changes: 6 additions & 6 deletions deepspeed/runtime/fp16/onebit/lamb.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ def __init__(self,

super(OnebitLamb, self).__init__(params, defaults)
self.eps_mode = 0 if eps_inside_sqrt else 1
assert (dist.is_initialized())

self.deepspeed = deepspeed
self.lamb_freeze_key = False
self.initialize = False
Expand All @@ -108,21 +106,23 @@ def __init__(self,

self.comm_backend_name = comm_backend_name

assert dist.is_initialized(), "Please initialize the torch distributed backend."
# Empty initializer. Set handle based on the comm backend as follows.
self.comm_backend_handle = None

if self.comm_backend_name == 'nccl':
assert (
required_torch_version(min_version=1.8)
), "Please use torch 1.8 or greater to enable NCCL backend in 1-bit Adam. Alternatively, please specify 'mpi' as the 'comm_backend_name' in config file to proceed with the MPI backend"
assert dist.is_initialized() == True, "Please initialize the torch distributed backend."
from deepspeed.runtime.comm.nccl import NcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = NcclBackend(self.deepspeed.mpu)

elif self.comm_backend_name == 'mpi':
from deepspeed.runtime.comm.mpi import MpiBackend
self.comm_backend_handle = MpiBackend(cuda_aware)
elif self.comm_backend_name == 'hccl':
from deepspeed.runtime.comm.hccl import HcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = HcclBackend(self.deepspeed.mpu)

self.size = self.comm_backend_handle.size

Expand Down Expand Up @@ -161,7 +161,7 @@ def step(self, closure=None, grads=None):
else:
grads_group = grads

#remove the previous stats
# remove the previous stats
del self.lamb_coeffs[:]

if self.lamb_freeze_key:
Expand Down
11 changes: 5 additions & 6 deletions deepspeed/runtime/fp16/onebit/zoadam.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ def __init__(self,

super(ZeroOneAdam, self).__init__(params, defaults)
self.eps_mode = 0 if eps_inside_sqrt else 1
assert (dist.is_initialized())

self.deepspeed = deepspeed
self.initialize = False
self.cuda_aware = cuda_aware
Expand All @@ -99,22 +97,23 @@ def __init__(self,

self.comm_backend_name = comm_backend_name

assert dist.is_initialized(), "Please initialize the torch distributed backend."
# Empty initializer. Set handle based on the comm backend as follows.
self.comm_backend_handle = None

if self.comm_backend_name == 'nccl':
assert (
required_torch_version(min_version=1.8)
), "Please use torch 1.8 or greater to enable NCCL backend in 0/1 Adam. Alternatively, please specify 'mpi' as the 'comm_backend_name' in config file to proceed with the MPI backend"
assert dist.is_initialized() == True, "Please initialize the torch distributed backend."
from deepspeed.runtime.comm.nccl import NcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = NcclBackend(self.deepspeed.mpu)

elif self.comm_backend_name == 'mpi':
from deepspeed.runtime.comm.mpi import MpiBackend
self.comm_backend_handle = MpiBackend(cuda_aware)

elif self.comm_backend_name == 'hccl':
from deepspeed.runtime.comm.hccl import HcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = HcclBackend(self.deepspeed.mpu)
self.size = self.comm_backend_handle.size

self.divider = int(self.size * 8 / np.gcd(self.size, 8))
Expand Down
Loading

0 comments on commit ee7fc0f

Please sign in to comment.