Skip to content

Commit

Permalink
i#6336: Fix multiple replay bugs (#6353)
Browse files Browse the repository at this point in the history
Adds a new unit test which targets replaying with region-of-interest
limits stopping before the end of the input streams.

The test has two variations, one with no interleavings and one with
interleavings, each hitting different bugs.

Fixes one significant bug:

+ The main bug hit and fixed is a race where the candidate record
already read from an input by one output is not queued until after
giving up the lock and re-acquiring later: and in that window another
output could pick up that input, think it's at the right spot, and miss
the queued entry, which then may show up randomly later.

The input queue is a deque so we can use both ends. The solution is to
put the candidate record in the queue before giving up the lock, and if
the input didn't change upon re-acquiring the lock, to pop it off the
back.

This requires some other changes: for skips we need to throw away the
newly-queued item, which could include an instr (so our assert on queued
instrs needs adjusting); and we now need to empty the queue for a
synthetic-end marker.

This was tested on the unit test, which was failing every 3rd run or so,
by running 1000 times. It was also tested on a larger proprietary test
which was failing every time: it now passes 2000 times in a row.

Two other fixes from the interleaving test:

+ On synthetic end, don't skip past it: else in some cases we miss the
synthetic exit record.

+ Don't consider to be at a stop point when still have entries in the
queue.

Fixes two replay bugs hit with the non-interleaving test:

+ A skip on replay failed to change the input as it returns out of
pick_next_input_as_previously(). We change it to just break out of the
loop. Without this fix, the new unit test fails to run an entire input
on replay (with no interleaving).

+ Non-file-based readers, like the mock_reader_t using a vector, were
missing a reader init call on a replay path with a skip. Without this
fix, the new unit test fails to run an entire input on replay (with no
interleaving).

Fixes two bugs hit in a larger app which did not easily reproduce
despite my attempts:

+ A hang: in pick_next_input_as_previously() the input instruction
ordinal wasn't yet at the beyond-limit ordinal for the synthetic exit
record so it kept doing a wait. The fix is to check for a synthetic exit
record. This one was hard to replicate so the unit test does not
reliably hit it, but the fix was tested on the original scenario.

+ "Failed to read from trace": pick_next_input_as_previously() returns
STATUS_SKIPPED when it hits the end (beyond setting at_eof) because it
needs to read the synthetic exit record inserted in the queue. But a
core that was .waiting=true errors out on non-STATUS_OK: so we change
that to not error on SKIPPED and it's fixed. This one was hard to
replicate so the unit test does not reliably hit it, but the fix was
tested on the original scenario.

Fixes #6336
  • Loading branch information
derekbruening authored Oct 10, 2023
1 parent 910ccb2 commit 54f393b
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 29 deletions.
146 changes: 117 additions & 29 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1370,22 +1370,42 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
return skip_instructions(output, input, cur_range.start_instruction - cur_instr - 1);
}

template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::clear_input_queue(input_info_t &input)
{
// We assume the queue contains no instrs other than the single candidate record we
// ourselves read but did not pass to the user (else our query of input.reader's
// instr ordinal would include them and so be incorrect) and that we should thus
// skip it all when skipping ahead in the input stream.
int i = 0;
while (!input.queue.empty()) {
assert(i == 0 || !record_type_is_instr(input.queue.front()));
++i;
input.queue.pop_front();
}
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t output,
input_info_t &input,
uint64_t skip_amount)
{
// We assume the queue contains no instrs (else our query of input.reader's
// instr ordinal would include them and so be incorrect) and that we should
// thus skip it all.
while (!input.queue.empty()) {
assert(!record_type_is_instr(input.queue.front()));
input.queue.pop_front();
// reader_t::at_eof_ is true until init() is called.
if (input.needs_init) {
input.reader->init();
input.needs_init = false;
}
// For a skip of 0 we still need to clear non-instrs from the queue, but
// should not have an instr in there.
assert(skip_amount > 0 || input.queue.empty() ||
!record_type_is_instr(input.queue.front()));
clear_input_queue(input);
input.reader->skip_instructions(skip_amount);
if (*input.reader == *input.reader_end) {
// Raise error because the input region is out of bounds.
VPRINT(this, 2, "skip_instructions: input=%d skip out of bounds\n", input.index);
input.at_eof = true;
return sched_type_t::STATUS_REGION_INVALID;
}
Expand Down Expand Up @@ -1483,6 +1503,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::close_schedule_segment(output_ordinal_
VPRINT(this, 3,
"close_schedule_segment: input=%d start=%" PRId64 " stop=%" PRId64 "\n",
input.index, outputs_[output].record.back().start_instruction, instr_ord);
// Check for empty default entries, except the starter 0,0 ones.
assert(outputs_[output].record.back().type != schedule_record_t::DEFAULT ||
outputs_[output].record.back().start_instruction < instr_ord ||
instr_ord == 0);
outputs_[output].record.back().stop_instruction = instr_ord;
return sched_type_t::STATUS_OK;
}
Expand Down Expand Up @@ -1602,6 +1626,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
index = segment.key.input;
VPRINT(this, 5,
"pick_next_input_as_previously[%d]: next replay segment in=%d (@%" PRId64
") type=%d start=%" PRId64 " end=%" PRId64 "\n",
output, index, inputs_[index].reader->get_instruction_ordinal(), segment.type,
segment.start_instruction, segment.stop_instruction);
{
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
if (inputs_[index].reader->get_instruction_ordinal() >
Expand All @@ -1614,12 +1643,16 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
}
if (inputs_[index].reader->get_instruction_ordinal() <
segment.start_instruction &&
// Don't wait for an ROI that starts at the beginning.
segment.start_instruction > 1 &&
// The output may have begun in the wait state.
(outputs_[output].record_index == -1 ||
// When we skip our separator+timestamp markers are at the
// prior instr ord so do not wait for that.
outputs_[output].record[outputs_[output].record_index].type !=
schedule_record_t::SKIP)) {
(outputs_[output].record[outputs_[output].record_index].type !=
schedule_record_t::SKIP &&
// Don't wait if we're at the end and just need the end record.
segment.type != schedule_record_t::SYNTHETIC_END))) {
// Some other output stream has not advanced far enough, and we do
// not support multiple positions in one input stream: we wait.
// XXX i#5843: We may want to provide a kernel-mediated wait
Expand Down Expand Up @@ -1659,18 +1692,23 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
if (segment.type == schedule_record_t::SYNTHETIC_END) {
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
// We're past the final region of interest and we need to insert
// a synthetic thread exit record.
// a synthetic thread exit record. We need to first throw out the
// queued candidate record, if any.
clear_input_queue(inputs_[index]);
inputs_[index].queue.push_back(create_thread_exit(inputs_[index].tid));
inputs_[index].at_eof = true;
VPRINT(this, 2, "early end for input %d\n", index);
// We're done with this entry so move to and past it.
outputs_[output].record_index += 2;
// We're done with this entry but we need the queued record to be read,
// so we do not move past the entry.
++outputs_[output].record_index;
return sched_type_t::STATUS_SKIPPED;
} else if (segment.type == schedule_record_t::SKIP) {
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
uint64_t cur_instr = inputs_[index].reader->get_instruction_ordinal();
VPRINT(this, 2, "skipping from %" PRId64 " to %" PRId64 " instrs for schedule\n",
cur_instr, segment.stop_instruction);
VPRINT(this, 2,
"next_record[%d]: skipping from %" PRId64 " to %" PRId64
" in %d for schedule\n",
output, cur_instr, segment.stop_instruction, index);
auto status =
skip_instructions(output, inputs_[index],
segment.stop_instruction - cur_instr - 1 /*exclusive*/);
Expand All @@ -1694,6 +1732,7 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t output,
bool in_wait_state)
{
sched_type_t::stream_status_t res = sched_type_t::STATUS_OK;
bool need_lock =
options_.mapping == MAP_TO_ANY_OUTPUT || options_.mapping == MAP_AS_PREVIOUSLY;
auto scoped_lock = need_lock ? std::unique_lock<std::mutex>(sched_lock_)
Expand All @@ -1703,8 +1742,25 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
while (true) {
if (index < 0) {
if (options_.mapping == MAP_AS_PREVIOUSLY) {
sched_type_t::stream_status_t res =
pick_next_input_as_previously(output, index);
res = pick_next_input_as_previously(output, index);
VDO(this, 2, {
if (outputs_[output].record_index >= 0 &&
outputs_[output].record_index <
static_cast<int>(outputs_[output].record.size())) {
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
int input = segment.key.input;
VPRINT(this, res == sched_type_t::STATUS_WAIT ? 3 : 2,
"next_record[%d]: replay segment in=%d (@%" PRId64
") type=%d start=%" PRId64 " end=%" PRId64 "\n",
output, input,
inputs_[input].reader->get_instruction_ordinal(),
segment.type, segment.start_instruction,
segment.stop_instruction);
}
});
if (res == sched_type_t::STATUS_SKIPPED)
break;
if (res != sched_type_t::STATUS_OK)
return res;
} else if (options_.mapping == MAP_TO_ANY_OUTPUT) {
Expand Down Expand Up @@ -1787,7 +1843,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
break;
}
set_cur_input(output, index);
return sched_type_t::STATUS_OK;
return res;
}

template <typename RecordType, typename ReaderType>
Expand All @@ -1804,8 +1860,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (!outputs_[output].active)
return sched_type_t::STATUS_WAIT;
if (outputs_[output].waiting) {
VPRINT(this, 5, "next_record[%d]: need new input (cur=waiting)\n", output);
sched_type_t::stream_status_t res = pick_next_input(output, true);
if (res != sched_type_t::STATUS_OK)
if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_SKIPPED)
return res;
outputs_[output].waiting = false;
}
Expand Down Expand Up @@ -1833,6 +1890,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
return sched_type_t::STATUS_OK;
}
while (true) {
bool from_queue = false;
if (input->needs_init) {
// We pay the cost of this conditional to support ipc_reader_t::init() which
// blocks and must be called right before reading its first record.
Expand All @@ -1846,6 +1904,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (!input->queue.empty()) {
record = input->queue.front();
input->queue.pop_front();
from_queue = true;
} else {
// We again have a flag check because reader_t::init() does an initial ++
// and so we want to skip that on the first record but perform a ++ prior
Expand All @@ -1859,18 +1918,25 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
}
if (input->at_eof || *input->reader == *input->reader_end) {
lock.unlock();
VPRINT(this, 5, "next_record[%d]: need new input (cur=%d eof)\n", output,
input->index);
sched_type_t::stream_status_t res = pick_next_input(output, false);
if (res != sched_type_t::STATUS_OK)
if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_SKIPPED)
return res;
input = &inputs_[outputs_[output].cur_input];
lock = std::unique_lock<std::mutex>(*input->lock);
if (res == sched_type_t::STATUS_SKIPPED) {
// Like for the ROI below, we need the queue or a de-ref.
input->needs_advance = false;
}
continue;
} else {
record = **input->reader;
}
}
VPRINT(this, 5, "next_record[%d]: candidate record from %d: ", output,
input->index);
VPRINT(this, 5,
"next_record[%d]: candidate record from %d (@%" PRId64 "): ", output,
input->index, input->reader->get_instruction_ordinal());
VDO(this, 5, print_record(record););
bool need_new_input = false;
bool in_wait_state = false;
Expand All @@ -1881,15 +1947,27 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// We're on the last record.
} else if (outputs_[output].record[outputs_[output].record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
need_new_input = true;
} else {
uint64_t stop = outputs_[output]
.record[outputs_[output].record_index]
.stop_instruction;
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
uint64_t start = segment.start_instruction;
uint64_t stop = segment.stop_instruction;
// The stop is exclusive. 0 does mean to do nothing (easiest
// to have an empty record to share the next-entry for a start skip
// or other cases).
if (input->reader->get_instruction_ordinal() >= stop) {
// Only check for stop when we've exhausted the queue, or we have
// a starter schedule with a 0,0 entry prior to a first skip entry
// (as just mentioned, it is easier to have a seemingly-redundant entry
// to get into the trace reading loop and then do something like a skip
// from the start rather than adding logic into the setup code).
if (input->reader->get_instruction_ordinal() >= stop &&
(!from_queue || (start == 0 && stop == 0))) {
VPRINT(this, 5,
"next_record[%d]: need new input: at end of segment in=%d "
"stop=%" PRId64 "\n",
output, input->index, stop);
need_new_input = true;
}
}
Expand Down Expand Up @@ -1958,6 +2036,12 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
need_new_input = true;
if (need_new_input) {
int prev_input = outputs_[output].cur_input;
VPRINT(this, 5, "next_record[%d]: need new input (cur=%d)\n", output,
prev_input);
// We have to put the candidate record in the queue before we release
// the lock since another output may grab this input.
VPRINT(this, 5, "next_record[%d]: queuing candidate record\n", output);
input->queue.push_back(record);
lock.unlock();
sched_type_t::stream_status_t res = pick_next_input(output, in_wait_state);
if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_WAIT &&
Expand All @@ -1973,17 +2057,21 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// for instr count too) -- but what about output during speculation?
// Decrement counts instead to undo?
lock.lock();
VPRINT(this, 5, "next_record_mid[%d]: from %d: queueing ", output,
input->index);
VDO(this, 5, print_record(record););
input->queue.push_back(record);
VPRINT(this, 5, "next_record_mid[%d]: switching from %d to %d\n", output,
prev_input, outputs_[output].cur_input);
if (res == sched_type_t::STATUS_WAIT)
return res;
input = &inputs_[outputs_[output].cur_input];
lock = std::unique_lock<std::mutex>(*input->lock);
continue;
} else
} else {
lock.lock();
if (res != sched_type_t::STATUS_SKIPPED) {
// Get our candidate record back.
record = input->queue.back();
input->queue.pop_back();
}
}
if (res == sched_type_t::STATUS_SKIPPED) {
// Like for the ROI below, we need the queue or a de-ref.
input->needs_advance = false;
Expand Down
5 changes: 5 additions & 0 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
advance_region_of_interest(output_ordinal_t output, RecordType &record,
input_info_t &input);

// Discards the contents of the input queue. Meant to be used when skipping
// input records.
void
clear_input_queue(input_info_t &input);

// Does a direct skip, unconditionally.
// The caller must hold the input.lock.
stream_status_t
Expand Down
Loading

0 comments on commit 54f393b

Please sign in to comment.