diff --git a/fuzzer/hashtest/BUILD b/fuzzer/hashtest/BUILD index 3205ac7d..2ee515fc 100644 --- a/fuzzer/hashtest/BUILD +++ b/fuzzer/hashtest/BUILD @@ -126,10 +126,12 @@ cc_library( deps = [ ":hashtest_generator_lib", "@silifuzz//instruction:xed_util", + "@silifuzz//util:cpu_id", "@silifuzz//util:page_util", "@cityhash", "@com_google_absl//absl/log:check", "@com_google_absl//absl/strings", + "@com_google_absl//absl/synchronization", "@com_google_absl//absl/types:span", ], ) @@ -183,7 +185,9 @@ cc_binary( deps = [ ":hashtest_generator_lib", ":hashtest_runner_lib", + ":parallel_worker_pool", "@silifuzz//instruction:xed_util", + "@silifuzz//util:cpu_id", "@silifuzz//util:enum_flag_types", "@silifuzz//util:itoa", "@silifuzz//util:platform", diff --git a/fuzzer/hashtest/hashtest_runner.cc b/fuzzer/hashtest/hashtest_runner.cc index 6dfa4dc1..f0e9cbc6 100644 --- a/fuzzer/hashtest/hashtest_runner.cc +++ b/fuzzer/hashtest/hashtest_runner.cc @@ -29,6 +29,7 @@ #include "absl/log/check.h" #include "absl/strings/str_cat.h" +#include "absl/synchronization/mutex.h" #include "absl/types/span.h" #include "third_party/cityhash/city.h" #include "./fuzzer/hashtest/hashtest_runner_widgits.h" @@ -36,6 +37,7 @@ #include "./fuzzer/hashtest/synthesize_base.h" #include "./fuzzer/hashtest/synthesize_test.h" #include "./instruction/xed_util.h" +#include "./util/cpu_id.h" #include "./util/page_util.h" namespace silifuzz { @@ -143,16 +145,19 @@ Corpus SynthesizeCorpus(Rng& rng, xed_chip_enum_t chip, }; } -void ResultReporter::ReportHit(size_t test_index, const Test& test, +void ResultReporter::ReportHit(int cpu, size_t test_index, const Test& test, size_t input_index, const Input& input) { + absl::MutexLock lock(&mutex); + hits.push_back({ + .cpu = cpu, .test_index = test_index, .test_seed = test.seed, .input_index = input_index, .input_seed = input.seed, }); - std::cout << "Hit " << FormatSeed(test.seed) << " / " + std::cout << "CPU " << cpu << " hit " << FormatSeed(test.seed) << " / " << FormatSeed(input.seed) << "\n"; } @@ -234,7 +239,7 @@ void RunTest(size_t test_index, const Test& test, const TestConfig& config, bool ok = expected.hash == EntropyBufferHash(actual, config.vector_width); if (!ok) { - result.ReportHit(test_index, test, input_index, input); + result.ReportHit(GetCPUId(), test_index, test, input_index, input); } } @@ -256,10 +261,7 @@ void RunBatch(absl::Span tests, absl::Span inputs, continue; } size_t test_index = test_offset + t; - if (r == 0 && i == 0 && test_index % 1000 == 0) { - std::cout << "Test " << test_index << " / " << FormatSeed(test.seed) - << "\n"; - } + // TODO(ncbray): display a heartbeat, of some sort. RunTest(test_index, test, config.test, i, input, expected, result); } } diff --git a/fuzzer/hashtest/hashtest_runner.h b/fuzzer/hashtest/hashtest_runner.h index 9860608c..d2e9833c 100644 --- a/fuzzer/hashtest/hashtest_runner.h +++ b/fuzzer/hashtest/hashtest_runner.h @@ -21,6 +21,7 @@ #include #include +#include "absl/synchronization/mutex.h" #include "absl/types/span.h" #include "./fuzzer/hashtest/instruction_pool.h" #include "./fuzzer/hashtest/synthesize_base.h" @@ -166,6 +167,8 @@ size_t ReconcileEndStates(absl::Span end_state, // All the information we want to remember about each hit. struct Hit { + // CPU the hit occurred on. + int cpu; // A unique identifier in the range [0, num_tests_generated) where // num_tests_generated is the total number of tests generated during this // invocation of the runner. (Each test has a unique index.) @@ -183,13 +186,15 @@ struct Hit { // An interface for reporting the results of test execution. struct ResultReporter { - void ReportHit(size_t test_index, const Test& test, size_t input_index, - const Input& input); + void ReportHit(int cpu, size_t test_index, const Test& test, + size_t input_index, const Input& input); // It's usually much more compact to collect each hit rather than keep // per-test statistics. We can always recreate those statistics later from the // hits. std::vector hits; + + absl::Mutex mutex; }; // The configuration for running multiple tests. diff --git a/fuzzer/hashtest/hashtest_runner_main.cc b/fuzzer/hashtest/hashtest_runner_main.cc index e7997cce..4fa105ba 100644 --- a/fuzzer/hashtest/hashtest_runner_main.cc +++ b/fuzzer/hashtest/hashtest_runner_main.cc @@ -29,8 +29,10 @@ #include "absl/types/span.h" #include "./fuzzer/hashtest/hashtest_runner.h" #include "./fuzzer/hashtest/instruction_pool.h" +#include "./fuzzer/hashtest/parallel_worker_pool.h" #include "./fuzzer/hashtest/synthesize_base.h" #include "./instruction/xed_util.h" +#include "./util/cpu_id.h" #include "./util/enum_flag_types.h" #include "./util/itoa.h" #include "./util/platform.h" @@ -66,26 +68,116 @@ std::vector GenerateInputs(Rng& rng, size_t num_inputs) { return inputs; } +// A list of tests to compute end states for. +struct EndStateSubtask { + absl::Span tests; + absl::Span end_states; +}; + +// Three lists of tests to compute end states for. +struct EndStateTask { + absl::Span inputs; + EndStateSubtask subtask0; + EndStateSubtask subtask1; + EndStateSubtask subtask2; +}; + +struct TestPartition { + // The first test included in the partition. + size_t offset; + // The number of tests in the partition. + size_t size; +}; + +// Divide the tests into `num_workers` groups and returns the `index`-th group +// of tests. +TestPartition GetParition(int index, size_t num_tests, size_t num_workers) { + CHECK_LT(index, num_workers); + size_t remainder = num_tests % num_workers; + size_t tests_in_chunk = num_tests / num_workers; + if (index < remainder) { + // The first `remainder` partitions have `tests_in_chunk` + 1 tests. + return TestPartition{ + .offset = index * (tests_in_chunk + 1), + .size = tests_in_chunk + 1, + }; + } else { + // The rest of the partitions have `tests_in_chunk` tests. + return TestPartition{ + .offset = index * tests_in_chunk + remainder, + .size = tests_in_chunk, + }; + } +} + +EndStateSubtask MakeSubtask(int index, size_t num_inputs, size_t num_workers, + absl::Span tests, + absl::Span end_states) { + TestPartition partition = GetParition(index, tests.size(), num_workers); + + return { + .tests = tests.subspan(partition.offset, partition.size), + .end_states = absl::MakeSpan(end_states) + .subspan(partition.offset * num_inputs, + partition.size * num_inputs), + }; +} + // For each test and input, compute an end state. // We compute each end state 3x, and choose an end state that occurred more than // once. If all the end states are different, the end state is marked as bad and // that test+input combination will be skipped when running tests. // In the future we will compute end states on different CPUs to reduce the // chance of the same data corruption occurring multiple times. -std::vector DetermineEndStates(const absl::Span tests, +std::vector DetermineEndStates(ParallelWorkerPool& workers, + const absl::Span tests, const TestConfig& config, const absl::Span inputs) { const size_t num_end_state = tests.size() * inputs.size(); + // Redundant sets of end states. std::vector end_states(num_end_state); std::vector compare1(num_end_state); std::vector compare2(num_end_state); - ComputeEndStates(tests, config, inputs, absl::MakeSpan(end_states)); - ComputeEndStates(tests, config, inputs, absl::MakeSpan(compare1)); - ComputeEndStates(tests, config, inputs, absl::MakeSpan(compare2)); + size_t num_workers = workers.NumWorkers(); + + // Partition work. + std::vector tasks(num_workers); + for (size_t i = 0; i < num_workers; ++i) { + EndStateTask& task = tasks[i]; + task.inputs = inputs; + + // For each of the redundant set of end states, compute a different + // partition on this core. + // Generating end states is pretty fast. The reason we're doing it on + // multiple cores is to try and ensure (to the greatest extent possible) + // that different cores are computing each redudnant version of the end + // state. This makes it unlikely that the same SDC will corrupt the end + // state twice. In cases where we are running on fewer than three cores, + // some of the redundant end states will be computed on the same core. + task.subtask0 = MakeSubtask(i, inputs.size(), num_workers, tests, + absl::MakeSpan(end_states)); + task.subtask1 = MakeSubtask((i + 1) % num_workers, inputs.size(), + num_workers, tests, absl::MakeSpan(compare1)); + task.subtask2 = MakeSubtask((i + 2) % num_workers, inputs.size(), + num_workers, tests, absl::MakeSpan(compare2)); + } - ReconcileEndStates(absl::MakeSpan(end_states), compare1, compare2); + // Execute. + workers.DoWork(tasks, [&](EndStateTask& task) { + ComputeEndStates(task.subtask0.tests, config, task.inputs, + task.subtask0.end_states); + ComputeEndStates(task.subtask1.tests, config, task.inputs, + task.subtask1.end_states); + ComputeEndStates(task.subtask2.tests, config, task.inputs, + task.subtask2.end_states); + }); + + // Try to guess which end states are correct, based on the redundancy. + size_t bad = + ReconcileEndStates(absl::MakeSpan(end_states), compare1, compare2); + std::cout << "Failed to reconcile " << bad << " end states." << "\n"; return end_states; } @@ -145,6 +237,18 @@ int TestMain(std::vector positional_args) { InstructionPool ipool{}; GenerateInstructionPool(rng, chip, ipool, verbose); + std::vector cpu_list; + ForEachAvailableCPU([&](int cpu) { cpu_list.push_back(cpu); }); + std::cout << "\n"; + std::cout << "Num threads: " << cpu_list.size() << "\n"; + CHECK_GT(cpu_list.size(), 0); + + // Create a pool of worker threads. + ParallelWorkerPool workers(cpu_list.size()); + + // Bind each worker thread to one of the available CPUs. + workers.DoWork(cpu_list, [](int cpu) { SetCPUAffinity(cpu); }); + absl::Duration code_gen_time; absl::Duration end_state_gen_time; absl::Duration test_time; @@ -184,7 +288,7 @@ int TestMain(std::vector positional_args) { std::cout << "Generating end states" << "\n"; begin = absl::Now(); std::vector end_states = - DetermineEndStates(corpus.tests, config.test, inputs); + DetermineEndStates(workers, corpus.tests, config.test, inputs); end_state_gen_time += absl::Now() - begin; std::cout << "End state size: " << (end_states.size() * sizeof(end_states[0]) / (1024 * 1024)) @@ -192,7 +296,11 @@ int TestMain(std::vector positional_args) { // Run test corpus. begin = absl::Now(); - RunTests(corpus.tests, inputs, end_states, config, c * num_tests, result); + // HACK: currently we don't have any per-thread state, so we're passing in + // the cpu id. In a future change, real per-thread state will be added. + workers.DoWork(cpu_list, [&](int cpu) { + RunTests(corpus.tests, inputs, end_states, config, c * num_tests, result); + }); test_time += absl::Now() - begin; // Count how many times each test hit. @@ -220,7 +328,7 @@ int TestMain(std::vector positional_args) { if (hit_count[t] > 0) { test_hits += 1; } - test_instance_run += times_test_has_run; + test_instance_run += times_test_has_run * workers.NumWorkers(); } } }