Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Asynchronous Training Bug #359

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions byteps/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,19 @@ void SendPushResponse(uint64_t key, const ps::KVMeta& req,
void SendPullResponse(const DataHandleType type, const uint64_t key,
const ps::KVMeta& req_meta, ps::KVServer<char>* server) {
std::lock_guard<std::mutex> lock(pullresp_mu_);
auto& updates = update_buf_[key];
CHECK(updates.merged.tensor) << "init " << key << " first";
char* data = updates.merged.tensor;
auto len = updates.merged.len;
char* data;
size_t len;
if (sync_mode_) {
auto& updates = update_buf_[key];
CHECK(updates.merged.tensor) << "init " << key << " first";
data = updates.merged.tensor;
len = updates.merged.len;
} else {
auto stored = store_.at(key);
CHECK(stored.tensor) << "init " << key << " first";
data = stored.tensor;
len = stored.len;
}

// send pull response
auto iterator = pull_response_map_.find(key);
Expand Down Expand Up @@ -281,8 +290,8 @@ void BytePSHandler(const ps::KVMeta& req_meta,
stored->dtype = type.dtype;
CHECK(stored->tensor);

bps_reducer_->copy(stored->tensor, recved,
len); // we may not need this copy
memset(stored->tensor, 0, stored->len);

for (const auto& req : updates.request) {
SendPushResponse(key, req, server);
}
Expand Down Expand Up @@ -508,7 +517,7 @@ extern "C" void byteps_server() {
free(it.second.tensor);
}
}

LOG(INFO) << "byteps has been shutdown";
return;
}
Expand Down
18 changes: 12 additions & 6 deletions byteps/torch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def __init__(self, params, named_parameters, compression,
'tuples (name, parameter), usually produced by '
'model.named_parameters().')

dups = _DistributedOptimizer.find_duplicates([k for k, _ in named_parameters])
dups = _DistributedOptimizer.find_duplicates(
[k for k, _ in named_parameters])
if len(dups) > 0:
raise ValueError('Parameter names in named_parameters must be unique. '
'Found duplicates: %s' % ', '.join(dups))
Expand All @@ -70,7 +71,8 @@ def __init__(self, params, named_parameters, compression,
# https://github.com/pytorch/pytorch/issues/7733
self._parameter_names = {v.__hash__(): k for k, v
in sorted(named_parameters)}
self._tensor_list = [tensor for name, tensor in named_parameters]
self._tensor_list = [tensor for name,
tensor in named_parameters]
else:
self._is_tensor_instance = False
self._parameter_names = {v: k for k, v
Expand Down Expand Up @@ -134,7 +136,8 @@ def _push_pull_grad_async(self, p):
else:
tensor = p.grad
tensor_compressed, ctx = self._compression.compress(tensor)
handle = byteps_push_pull(tensor_compressed, average=True, name="Gradient."+name)
handle = byteps_push_pull(
tensor_compressed, average=True, name="Gradient."+name)
return handle, ctx

def _make_hook(self, p):
Expand Down Expand Up @@ -176,7 +179,8 @@ def synchronize(self):
@contextmanager
def skip_synchronize(self):
if self._enable_async:
raise AssertionError("skip_synchronize cannot be used in async training")
raise AssertionError(
"skip_synchronize cannot be used in async training")
self._should_sync = False
try:
yield
Expand All @@ -201,7 +205,8 @@ def step(self, closure=None):
name = self._parameter_names.get(p.__hash__())
else:
name = self._parameter_names.get(p)
handle = byteps_push_pull(p, average=False, name="AsyncParam."+name)
handle = byteps_push_pull(
p, average=False, name="Parameter."+name)
_, ctx = self._compression.compress(p)
self._handles[p] = (handle, ctx)

Expand Down Expand Up @@ -378,7 +383,8 @@ def _from_tensor():
key = '%s.%d' % (option_key, index)
dtypes = _get_types(option_value)
option_tensor = torch.Tensor([option_value]).cuda()
callbacks[key] = _create_option_callback(index, option_key, option_tensor, dtypes)
callbacks[key] = _create_option_callback(
index, option_key, option_tensor, dtypes)
params.append((key, option_tensor))

# The params list here is ordered by the layers in the model
Expand Down