Skip to content

Commit

Permalink
Enforce monotonicity for timestamps in event parser (#1733)
Browse files Browse the repository at this point in the history
Summary: Ensures that timestamps in the event parser are monotonically
increasing.

The current stitching logic for CQL relies on the assumption that frame
deques are chronologically sorted based on their timestamps. Timestamps
from eBPF, however, may not always be monotonic due to CPU variances,
with potential discrepancies of [up to 880
usec](https://lore.kernel.org/bpf/CAJD7tkYOs4LKa=j+xNRMRiK=ors7_uCBtAjp6axRNQo0NHQqWA@mail.gmail.com/).
This change catches potential anomalies due to non-monotonic timestamps
by ensuring that the next frame's timestamp will always be greater than
the previous one.

The DataStreamBuffer itself relies on byte positions, not timestamps,
for its internal ordering so it won't be affected by this timestamp
issue directly. Although DataTables are sorted before being pushed to
the TableStore, the timestamp variance from eBPF might still have
implications. More experiments will be required to determine how
frequently this condition is hit in the wild.

Type of change: /kind bug

Test Plan: Existing targets.

---------

Signed-off-by: Benjamin Kilimnik <[email protected]>
  • Loading branch information
benkilimnik authored Nov 2, 2023
1 parent 6526dcc commit 1d067cf
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,24 @@ class DataStreamBuffer {

/**
* Get timestamp recorded for the data at the specified position.
* If less than previous timestamp, timestamp will be adjusted to be monotonically increasing.
* @param pos The logical position of the data.
* @return The timestamp or error if the position does not contain valid data.
*/
StatusOr<uint64_t> GetTimestamp(size_t pos) const { return impl_->GetTimestamp(pos); }
StatusOr<uint64_t> GetTimestamp(size_t pos) {
StatusOr<uint64_t> timestamp_ns_status = impl_->GetTimestamp(pos);
if (!timestamp_ns_status.ok()) {
return timestamp_ns_status;
}
uint64_t current_timestamp_ns = timestamp_ns_status.ConsumeValueOrDie();
if (current_timestamp_ns < prev_timestamp_ns_) {
LOG(WARNING) << "Detected non-monotonically increasing timestamp " << current_timestamp_ns
<< ". Adjusting to previous timestamp + 1: " << prev_timestamp_ns_ + 1;
current_timestamp_ns = prev_timestamp_ns_ + 1;
}
prev_timestamp_ns_ = current_timestamp_ns;
return current_timestamp_ns;
}

/**
* Remove n bytes from the head of the buffer.
Expand Down Expand Up @@ -148,6 +162,7 @@ class DataStreamBuffer {

private:
std::unique_ptr<DataStreamBufferImpl> impl_;
uint64_t prev_timestamp_ns_ = 0;
};

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ TEST_P(DataStreamBufferTest, Timestamp) {
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(4), 4);
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(7), 4);
EXPECT_NOT_OK(stream_buffer.GetTimestamp(8));

// Test automatic adjustment of non-monotonic timestamp
stream_buffer.Add(8, "89", 3); // timestamp is 3, which is less than previous timestamp 4
EXPECT_EQ(stream_buffer.Head(), "123456789");
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(8),
5); // timestamp is adjusted to previous timestamp + 1
}

TEST_P(DataStreamBufferTest, TimestampWithGap) {
Expand Down

0 comments on commit 1d067cf

Please sign in to comment.