Skip to content

Commit

Permalink
hashtest: execute in parallel on all cores
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 669414648
  • Loading branch information
ncbray authored and copybara-github committed Aug 30, 2024
1 parent ae49dfb commit ea24a15
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 17 deletions.
4 changes: 4 additions & 0 deletions fuzzer/hashtest/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ cc_library(
deps = [
":hashtest_generator_lib",
"@silifuzz//instruction:xed_util",
"@silifuzz//util:cpu_id",
"@silifuzz//util:page_util",
"@cityhash",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/types:span",
],
)
Expand Down Expand Up @@ -183,7 +185,9 @@ cc_binary(
deps = [
":hashtest_generator_lib",
":hashtest_runner_lib",
":parallel_worker_pool",
"@silifuzz//instruction:xed_util",
"@silifuzz//util:cpu_id",
"@silifuzz//util:enum_flag_types",
"@silifuzz//util:itoa",
"@silifuzz//util:platform",
Expand Down
16 changes: 9 additions & 7 deletions fuzzer/hashtest/hashtest_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@

#include "absl/log/check.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/span.h"
#include "third_party/cityhash/city.h"
#include "./fuzzer/hashtest/hashtest_runner_widgits.h"
#include "./fuzzer/hashtest/instruction_pool.h"
#include "./fuzzer/hashtest/synthesize_base.h"
#include "./fuzzer/hashtest/synthesize_test.h"
#include "./instruction/xed_util.h"
#include "./util/cpu_id.h"
#include "./util/page_util.h"

namespace silifuzz {
Expand Down Expand Up @@ -143,16 +145,19 @@ Corpus SynthesizeCorpus(Rng& rng, xed_chip_enum_t chip,
};
}

void ResultReporter::ReportHit(size_t test_index, const Test& test,
void ResultReporter::ReportHit(int cpu, size_t test_index, const Test& test,
size_t input_index, const Input& input) {
absl::MutexLock lock(&mutex);

hits.push_back({
.cpu = cpu,
.test_index = test_index,
.test_seed = test.seed,
.input_index = input_index,
.input_seed = input.seed,
});

std::cout << "Hit " << FormatSeed(test.seed) << " / "
std::cout << "CPU " << cpu << " hit " << FormatSeed(test.seed) << " / "
<< FormatSeed(input.seed) << "\n";
}

Expand Down Expand Up @@ -234,7 +239,7 @@ void RunTest(size_t test_index, const Test& test, const TestConfig& config,
bool ok = expected.hash == EntropyBufferHash(actual, config.vector_width);

if (!ok) {
result.ReportHit(test_index, test, input_index, input);
result.ReportHit(GetCPUId(), test_index, test, input_index, input);
}
}

Expand All @@ -256,10 +261,7 @@ void RunBatch(absl::Span<const Test> tests, absl::Span<const Input> inputs,
continue;
}
size_t test_index = test_offset + t;
if (r == 0 && i == 0 && test_index % 1000 == 0) {
std::cout << "Test " << test_index << " / " << FormatSeed(test.seed)
<< "\n";
}
// TODO(ncbray): display a heartbeat, of some sort.
RunTest(test_index, test, config.test, i, input, expected, result);
}
}
Expand Down
9 changes: 7 additions & 2 deletions fuzzer/hashtest/hashtest_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <vector>

#include "absl/synchronization/mutex.h"
#include "absl/types/span.h"
#include "./fuzzer/hashtest/instruction_pool.h"
#include "./fuzzer/hashtest/synthesize_base.h"
Expand Down Expand Up @@ -166,6 +167,8 @@ size_t ReconcileEndStates(absl::Span<EndState> end_state,

// All the information we want to remember about each hit.
struct Hit {
// CPU the hit occurred on.
int cpu;
// A unique identifier in the range [0, num_tests_generated) where
// num_tests_generated is the total number of tests generated during this
// invocation of the runner. (Each test has a unique index.)
Expand All @@ -183,13 +186,15 @@ struct Hit {

// An interface for reporting the results of test execution.
struct ResultReporter {
void ReportHit(size_t test_index, const Test& test, size_t input_index,
const Input& input);
void ReportHit(int cpu, size_t test_index, const Test& test,
size_t input_index, const Input& input);

// It's usually much more compact to collect each hit rather than keep
// per-test statistics. We can always recreate those statistics later from the
// hits.
std::vector<Hit> hits;

absl::Mutex mutex;
};

// The configuration for running multiple tests.
Expand Down
124 changes: 116 additions & 8 deletions fuzzer/hashtest/hashtest_runner_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
#include "absl/types/span.h"
#include "./fuzzer/hashtest/hashtest_runner.h"
#include "./fuzzer/hashtest/instruction_pool.h"
#include "./fuzzer/hashtest/parallel_worker_pool.h"
#include "./fuzzer/hashtest/synthesize_base.h"
#include "./instruction/xed_util.h"
#include "./util/cpu_id.h"
#include "./util/enum_flag_types.h"
#include "./util/itoa.h"
#include "./util/platform.h"
Expand Down Expand Up @@ -66,26 +68,116 @@ std::vector<Input> GenerateInputs(Rng& rng, size_t num_inputs) {
return inputs;
}

// A list of tests to compute end states for.
struct EndStateSubtask {
absl::Span<const Test> tests;
absl::Span<EndState> end_states;
};

// Three lists of tests to compute end states for.
struct EndStateTask {
absl::Span<const Input> inputs;
EndStateSubtask subtask0;
EndStateSubtask subtask1;
EndStateSubtask subtask2;
};

struct TestPartition {
// The first test included in the partition.
size_t offset;
// The number of tests in the partition.
size_t size;
};

// Divide the tests into `num_workers` groups and returns the `index`-th group
// of tests.
TestPartition GetParition(int index, size_t num_tests, size_t num_workers) {
CHECK_LT(index, num_workers);
size_t remainder = num_tests % num_workers;
size_t tests_in_chunk = num_tests / num_workers;
if (index < remainder) {
// The first `remainder` partitions have `tests_in_chunk` + 1 tests.
return TestPartition{
.offset = index * (tests_in_chunk + 1),
.size = tests_in_chunk + 1,
};
} else {
// The rest of the partitions have `tests_in_chunk` tests.
return TestPartition{
.offset = index * tests_in_chunk + remainder,
.size = tests_in_chunk,
};
}
}

EndStateSubtask MakeSubtask(int index, size_t num_inputs, size_t num_workers,
absl::Span<const Test> tests,
absl::Span<EndState> end_states) {
TestPartition partition = GetParition(index, tests.size(), num_workers);

return {
.tests = tests.subspan(partition.offset, partition.size),
.end_states = absl::MakeSpan(end_states)
.subspan(partition.offset * num_inputs,
partition.size * num_inputs),
};
}

// For each test and input, compute an end state.
// We compute each end state 3x, and choose an end state that occurred more than
// once. If all the end states are different, the end state is marked as bad and
// that test+input combination will be skipped when running tests.
// In the future we will compute end states on different CPUs to reduce the
// chance of the same data corruption occurring multiple times.
std::vector<EndState> DetermineEndStates(const absl::Span<const Test> tests,
std::vector<EndState> DetermineEndStates(ParallelWorkerPool& workers,
const absl::Span<const Test> tests,
const TestConfig& config,
const absl::Span<const Input> inputs) {
const size_t num_end_state = tests.size() * inputs.size();

// Redundant sets of end states.
std::vector<EndState> end_states(num_end_state);
std::vector<EndState> compare1(num_end_state);
std::vector<EndState> compare2(num_end_state);

ComputeEndStates(tests, config, inputs, absl::MakeSpan(end_states));
ComputeEndStates(tests, config, inputs, absl::MakeSpan(compare1));
ComputeEndStates(tests, config, inputs, absl::MakeSpan(compare2));
size_t num_workers = workers.NumWorkers();

// Partition work.
std::vector<EndStateTask> tasks(num_workers);
for (size_t i = 0; i < num_workers; ++i) {
EndStateTask& task = tasks[i];
task.inputs = inputs;

// For each of the redundant set of end states, compute a different
// partition on this core.
// Generating end states is pretty fast. The reason we're doing it on
// multiple cores is to try and ensure (to the greatest extent possible)
// that different cores are computing each redudnant version of the end
// state. This makes it unlikely that the same SDC will corrupt the end
// state twice. In cases where we are running on fewer than three cores,
// some of the redundant end states will be computed on the same core.
task.subtask0 = MakeSubtask(i, inputs.size(), num_workers, tests,
absl::MakeSpan(end_states));
task.subtask1 = MakeSubtask((i + 1) % num_workers, inputs.size(),
num_workers, tests, absl::MakeSpan(compare1));
task.subtask2 = MakeSubtask((i + 2) % num_workers, inputs.size(),
num_workers, tests, absl::MakeSpan(compare2));
}

ReconcileEndStates(absl::MakeSpan(end_states), compare1, compare2);
// Execute.
workers.DoWork(tasks, [&](EndStateTask& task) {
ComputeEndStates(task.subtask0.tests, config, task.inputs,
task.subtask0.end_states);
ComputeEndStates(task.subtask1.tests, config, task.inputs,
task.subtask1.end_states);
ComputeEndStates(task.subtask2.tests, config, task.inputs,
task.subtask2.end_states);
});

// Try to guess which end states are correct, based on the redundancy.
size_t bad =
ReconcileEndStates(absl::MakeSpan(end_states), compare1, compare2);
std::cout << "Failed to reconcile " << bad << " end states." << "\n";

return end_states;
}
Expand Down Expand Up @@ -145,6 +237,18 @@ int TestMain(std::vector<char*> positional_args) {
InstructionPool ipool{};
GenerateInstructionPool(rng, chip, ipool, verbose);

std::vector<int> cpu_list;
ForEachAvailableCPU([&](int cpu) { cpu_list.push_back(cpu); });
std::cout << "\n";
std::cout << "Num threads: " << cpu_list.size() << "\n";
CHECK_GT(cpu_list.size(), 0);

// Create a pool of worker threads.
ParallelWorkerPool workers(cpu_list.size());

// Bind each worker thread to one of the available CPUs.
workers.DoWork(cpu_list, [](int cpu) { SetCPUAffinity(cpu); });

absl::Duration code_gen_time;
absl::Duration end_state_gen_time;
absl::Duration test_time;
Expand Down Expand Up @@ -184,15 +288,19 @@ int TestMain(std::vector<char*> positional_args) {
std::cout << "Generating end states" << "\n";
begin = absl::Now();
std::vector<EndState> end_states =
DetermineEndStates(corpus.tests, config.test, inputs);
DetermineEndStates(workers, corpus.tests, config.test, inputs);
end_state_gen_time += absl::Now() - begin;
std::cout << "End state size: "
<< (end_states.size() * sizeof(end_states[0]) / (1024 * 1024))
<< " MB" << "\n";

// Run test corpus.
begin = absl::Now();
RunTests(corpus.tests, inputs, end_states, config, c * num_tests, result);
// HACK: currently we don't have any per-thread state, so we're passing in
// the cpu id. In a future change, real per-thread state will be added.
workers.DoWork(cpu_list, [&](int cpu) {
RunTests(corpus.tests, inputs, end_states, config, c * num_tests, result);
});
test_time += absl::Now() - begin;

// Count how many times each test hit.
Expand Down Expand Up @@ -220,7 +328,7 @@ int TestMain(std::vector<char*> positional_args) {
if (hit_count[t] > 0) {
test_hits += 1;
}
test_instance_run += times_test_has_run;
test_instance_run += times_test_has_run * workers.NumWorkers();
}
}
}
Expand Down

0 comments on commit ea24a15

Please sign in to comment.