Skip to content

Commit

Permalink
Properly check and use the number of total shards.
Browse files Browse the repository at this point in the history
Total shards can't be less than 2.
  • Loading branch information
abyss7 committed Mar 12, 2017
1 parent 664c212 commit e6c3039
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/base/locked_queue_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class LockedQueue<T>::Index {
}

QueueIterator Get(ui32 shard, QueueIterator begin) THREAD_UNSAFE {
CHECK(shard < index_.size());
DCHECK(!index_.empty());

QueueIterator item;
if (!index_[shard].empty()) {
if (shard < index_.size() && !index_[shard].empty()) {
item = index_[shard].front();
index_[shard].pop_front();
reverse_index_.erase(&*item);
Expand Down
2 changes: 1 addition & 1 deletion src/base/locked_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ TEST(LockedQueueIndexTest, GetFromHead) {
LockedQueue<int>::Index index;

const ui32 shard = 4u;
const ui32 empty_shard = 3u;
const ui32 empty_shard = 5u;

auto inserted_item = list.insert(list.end(), 1);
index.Put(inserted_item, shard);
Expand Down
3 changes: 2 additions & 1 deletion src/daemon/compilation_daemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,13 @@ bool CompilationDaemon::SearchDirectCache(
auto conf = this->conf();

DCHECK(conf->has_emitter() && !conf->has_absorber());
DCHECK(flags.has_input());

if (!cache_ || !conf->cache().direct()) {
return false;
}

DCHECK(flags.has_input());

const Version version(flags.compiler().version());
const String input = GetFullPath(current_dir, flags.input());
const CommandLine command_line(CommandLineForDirectCache(current_dir, flags));
Expand Down
4 changes: 2 additions & 2 deletions src/daemon/configuration.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ message Configuration {
// Interval in seconds for polling coordinator.

repeated Host coordinators = 6;
optional uint32 total_shards = 7 [ default = 0 ];
// Coordinator should set this field: 0 - means that shards not used.
optional uint32 total_shards = 7;
// Number of total shards can't be less than 2.

optional uint32 pop_timeout = 8 [ default = 1 ];
// in seconds - can't be zero.
Expand Down
4 changes: 3 additions & 1 deletion src/daemon/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ bool Coordinator::HandleNewMessage(net::ConnectionPtr connection,
proto::Host* host = emitter->add_remotes();
host->CopyFrom(remote);
}
emitter->set_total_shards(conf()->coordinator().total_shards());
if (conf()->coordinator().has_total_shards()) {
emitter->set_total_shards(conf()->coordinator().total_shards());
}

connection->SendAsync(std::move(configuration));
return true;
Expand Down
99 changes: 49 additions & 50 deletions src/daemon/emitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,12 @@ void Emitter::DoCheckCache(const base::WorkerPool& pool) {
STAT(SIMPLE_CACHE_MISS);

auto hash = GenerateHash(incoming->flags(), source, extra_files);
ui32 shard = use_shards_
DCHECK(!conf->emitter().has_total_shards() ||
conf->emitter().total_shards() > 0);
ui32 shard = conf->emitter().has_total_shards()
? std::hash<Immutable>{}(hash.str.Hash(4)) %
conf->emitter().total_shards()
: 0;
: Queue::DEFAULT_SHARD;
all_tasks_->Push(std::move(*task), shard);
}
}
Expand Down Expand Up @@ -370,6 +372,8 @@ void Emitter::DoLocalExecute(const base::WorkerPool& pool) {

void Emitter::DoRemoteExecute(const base::WorkerPool& pool, ResolveFn resolver,
const ui32 shard) {
auto conf = this->conf();

net::EndPointPtr end_point;
ui32 sleep_period = 1;
auto Sleep = [&sleep_period]() mutable {
Expand Down Expand Up @@ -411,7 +415,7 @@ void Emitter::DoRemoteExecute(const base::WorkerPool& pool, ResolveFn resolver,
}

// If we're using shards we should have generated source by now.
DCHECK(!use_shards_ || !source.str.empty());
DCHECK(!conf->emitter().has_total_shards() || !source.str.empty());

auto outgoing = std::make_unique<proto::Remote>();
if (source.str.empty() && !GenerateSource(incoming, &source)) {
Expand Down Expand Up @@ -584,7 +588,9 @@ void Emitter::DoPoll(const base::WorkerPool& pool,
reply->GetExtension(Configuration::extension).emitter();
new_conf.mutable_emitter()->mutable_remotes()->CopyFrom(
emitter.remotes());
new_conf.mutable_emitter()->set_total_shards(emitter.total_shards());
if (emitter.has_total_shards()) {
new_conf.mutable_emitter()->set_total_shards(emitter.total_shards());
}
if (!Update(new_conf)) {
// FIXME(ilezhankin): print coordinator's address for clarity.
LOG(WARNING) << "Failed to update to configuration from coordinator!";
Expand Down Expand Up @@ -619,58 +625,56 @@ bool Emitter::Check(const Configuration& conf) const {

const auto& emitter = conf.emitter();

if (emitter.only_failed()) {
bool has_active_remote = false;
// We always should have enabled remotes even with active coordinators.
// Otherwise we risk to never get any remotes and be silent about it.

for (const auto& remote : emitter.remotes()) {
if (!remote.disabled()) {
has_active_remote = true;
break;
}
}

if (!has_active_remote) {
LOG(ERROR) << "Daemon will hang without active remotes "
"and a set flag \"emitter.only_failed\"";
if (emitter.has_total_shards()) {
if (emitter.total_shards() > max_total_shards) {
LOG(ERROR) << "Due to peculiarities of implementation emitter can't use "
"more than "
<< max_total_shards << " total shards";
return false;
} else if (emitter.total_shards() < 2) {
LOG(ERROR) << "Number of total shards must be greater than 1";
return false;
} else if (!conf.has_cache() || conf.cache().disabled()) {
// FIXME: if we don't have cache workers, then we can't
// |GenerateSource()| in any separate thread pool.
// Also we don't want to pollute sharded remotes with random
// builds - so just don't use them for now.
LOG(ERROR) << "Can't use sharded remotes with disabled local cache";
return false;
}
}

if (!conf.has_cache() || conf.cache().disabled()) {
for (const auto& remote : emitter.remotes()) {
if (!remote.disabled() && remote.has_shard()) {
// FIXME: if we don't have cache workers, then we can't
// |GenerateSource()| in any separate thread pool.
// Also we don't want to pollute sharded remotes with random
// builds - so just don't use them for now.
LOG(ERROR) << "Can't use sharded remotes with disabled local cache";
bool has_active_remote = false;
for (const auto& remote : emitter.remotes()) {
if (!remote.disabled()) {
has_active_remote = true;

if (remote.has_shard()) {
if (!emitter.has_total_shards()) {
LOG(ERROR) << "Remote shouldn't have shard when the number of total "
"shards isn't set";
return false;
} else if (remote.shard() >= emitter.total_shards()) {
LOG(ERROR) << "Remote's shard number is out of range (total shards: "
<< emitter.total_shards() << ")";
return false;
}
} else if (emitter.has_total_shards()) {
LOG(ERROR) << "All remotes should have their shards when the number of "
"total shards is set";
return false;
}
}
}

for (const auto& remote : emitter.remotes()) {
if (remote.has_shard() && remote.shard() >= emitter.total_shards()) {
LOG(ERROR) << "Remote's shard number is out of range (total shards: "
<< emitter.total_shards() << ")";
return false;
}
}

if (emitter.total_shards() > max_total_shards) {
LOG(ERROR)
<< "Due to peculiarities of implementation emitter can't use more than "
<< max_total_shards << " total shards";
if (emitter.only_failed() && !has_active_remote) {
// We always should have enabled remotes even with active coordinators.
// Otherwise we risk to never get any remotes and be silent about it.
LOG(ERROR) << "Daemon will hang without active remotes and a set flag "
"\"emitter.only_failed\"";
return false;
}

if (emitter.total_shards() == 1) {
LOG(WARNING)
<< "Single shard is meaningless, since it adds CPU load without profit";
}

return true;
}

Expand All @@ -694,12 +698,7 @@ bool Emitter::Reload(const proto::Configuration& conf) {
return optional->GetValue();
};

ui32 shard = Queue::DEFAULT_SHARD;
if (remote.has_shard()) {
use_shards_ = true;
shard = remote.shard();
}

ui32 shard = remote.has_shard() ? remote.shard() : Queue::DEFAULT_SHARD;
Worker worker =
std::bind(&Emitter::DoRemoteExecute, this, _1, resolver, shard);
new_pool->AddWorker("Remote Execute Worker"_l, worker, remote.threads());
Expand Down
4 changes: 0 additions & 4 deletions src/daemon/emitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ class Emitter : public CompilationDaemon {
// Indicates if we force shutdown of the remote workers pool: we shouldn't if
// there is no coordinators, or if we stopped to poll coordinators.

bool use_shards_ = false;
// Indicates whether we should always generate unhandled source for tasks,
// since it's required for proper sharding even without local cache.

static const constexpr ui32 max_total_shards = 1024u;
};

Expand Down
15 changes: 13 additions & 2 deletions src/daemon/emitter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ TEST_F(EmitterTest, ConfigurationUpdateFromCoordinator) {
const base::TemporaryDir temp_dir;
const auto expected_code = net::proto::Status::OK;
const auto action = "fake_action"_l;
const auto handled_source = "fake_source"_l;
const auto handled_source1 = "fake_source1"_l;
const auto handled_source2 = "fake_source2"_l;
const String object_code = "fake_object_code";
const String compiler_version = "fake_compiler_version";
const String compiler_path = "fake_compiler_path";
Expand All @@ -236,6 +237,9 @@ TEST_F(EmitterTest, ConfigurationUpdateFromCoordinator) {

conf.mutable_emitter()->set_only_failed(true);
conf.mutable_emitter()->set_poll_interval(1u);
conf.mutable_cache()->set_path(temp_dir);
conf.mutable_cache()->set_direct(false);
conf.mutable_cache()->set_clean_period(1);

auto* coordinator = conf.mutable_emitter()->add_coordinators();
coordinator->set_host(coordinator_host);
Expand Down Expand Up @@ -267,6 +271,7 @@ TEST_F(EmitterTest, ConfigurationUpdateFromCoordinator) {
auto* remote = emitter->add_remotes();
remote->set_host(old_remote_host);
remote->set_port(old_remote_port);
remote->set_shard(1);
remote->set_threads(1);

emitter->set_total_shards(old_total_shards);
Expand Down Expand Up @@ -296,6 +301,7 @@ TEST_F(EmitterTest, ConfigurationUpdateFromCoordinator) {
auto* remote = emitter->add_remotes();
remote->set_host(new_remote_host);
remote->set_port(new_remote_port);
remote->set_shard(1);
remote->set_threads(1);

emitter->set_total_shards(new_total_shards);
Expand Down Expand Up @@ -346,6 +352,7 @@ TEST_F(EmitterTest, ConfigurationUpdateFromCoordinator) {

EXPECT_EQ(EndPointString(new_remote_host, new_remote_port),
end_point->Print());
DCHECK(emitter);
EXPECT_EQ(new_total_shards, emitter->conf()->emitter().total_shards());

connection->CallOnSend([this](const net::Connection::Message&) {
Expand All @@ -364,7 +371,11 @@ TEST_F(EmitterTest, ConfigurationUpdateFromCoordinator) {

run_callback = [&](base::TestProcess* process) {
EXPECT_EQ((Immutable::Rope{"-E"_l, "-o"_l, "-"_l}), process->args_);
process->stdout_ = handled_source;
if (run_count == 1) {
process->stdout_ = handled_source1;
} else if (run_count == 2) {
process->stdout_ = handled_source2;
}
};

emitter = std::make_unique<Emitter>(conf);
Expand Down

0 comments on commit e6c3039

Please sign in to comment.