Skip to content

Commit

Permalink
[Fix CQL Stitcher 1/4] Change StitchFrames API for protocols with str…
Browse files Browse the repository at this point in the history
…eams only (#1689)

Summary: This is the first of four PRs aimed at fixing an issue with the
CQL stitcher discussed in #1375. It introduces an additional interface
for `StitchFrames` for protocols with streams:
```cpp
// interface for protocols with streams
StitchFrames(absl::flat_hash_map<stream_id_t, std::deque<Frame>>* req_messages, absl::flat_hash_map<stream_id_t, std::deque<Frame>>* res_messages)
```
The old interface is retained for other protocols by default. To use the
new map interface, protocols must have the `StreamSupport` enum set to
`UseStream` in their `ProtocolTraits`.

```cpp
// interface for protocols without a notion of streams i.e. which don't use streams for frame stitching
StitchFrames(std::deque<Frame>* req_messages, std::deque<Frame>* res_messages)
```
This change should be a no op, as the default interface is used across
the board. The next PR has the CQL stitcher and tests actually use it.

Note that in PRs 1/4 and 2/4 the map is populated after parsing frames
into request/response deques. PRs 3/4 and 4/4 populate the map earlier
during parsing and has protocols without streams use the first key i.e.
a single stream. This is a larger change and therefore separated.

Related issues: #1375

Type of change: /kind bug

Test Plan: Tested all existing targets - `bazel test ...`

Performance: I've benchmarked the performance impact of this API change
using a [new
demo](https://github.com/benkilimnik/pixie-privy/tree/cql-perf-test)
that cycles through streams. The demo uses a [fork of the datastax
driver for cassandra](https://github.com/benkilimnik/python-driver)
which I tinkered with to prevent stream ID reuse. This demo goes all the
way up to `2^15-1`, the absolute worst case in terms of stream ID reuse.
Note that this is probably a very unrealistic scenario (by default, this
particular driver only goes up to 300 streams max), but it helps us test
the limits of the map interface. As a sanity check, I also ran an
experiment for the existing k8ssandra demo.

Note that I only ran one experiment for each of k8ssandra and the
bad-stream-reuse demos. More perf tests may be required to reliably
assess performance impact, but this at least rules out a major
degradation. Also, since this change is currently scoped to only the
cassandra stitcher, it shouldn't impact any of the other protocols.

## Benchmark results
### Python demo poor stream ID reuse

Baseline

![baseline_poor_stream_reuse](https://github.com/pixie-io/pixie/assets/47846691/2e7a99c8-c1bb-4dda-8dc2-202a2c35d18a)

API change (including changes to CQL Stitcher to make use of new
interface PR 2/3 #1715): streams up to `2^15-1`

![map_interface_poor_stream_reuse](https://github.com/pixie-io/pixie/assets/47846691/f946af12-f19f-4cac-9195-b9db709787ba)

### K8ssandra demo
Baseline

![baseline_k8ssandra](https://github.com/pixie-io/pixie/assets/47846691/e564f680-5470-4cc2-9431-507c4c2323ae)

API change (including changes to CQL Stitcher to make use of new
interface in PR 2/3 #1715)

![map_interface_k8ssandra](https://github.com/pixie-io/pixie/assets/47846691/dcd5337f-d5fa-4ce2-9694-53f1ded37fa2)

Signed-off-by: Benjamin Kilimnik <[email protected]>
  • Loading branch information
benkilimnik authored Oct 17, 2023
1 parent dfb073a commit e670eb6
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 8 deletions.
37 changes: 34 additions & 3 deletions src/stirling/source_connectors/socket_tracer/conn_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <absl/container/flat_hash_map.h>
#include <any>
#include <deque>
#include <list>
Expand Down Expand Up @@ -271,9 +272,39 @@ class ConnTracker : NotCopyMoveable {
CONN_TRACE(2) << absl::Substitute("req_frames=$0 resp_frames=$1", req_frames.size(),
resp_frames.size());

protocols::RecordsWithErrorCount<TRecordType> result =
protocols::StitchFrames<TRecordType, TFrameType, TStateType>(&req_frames, &resp_frames,
state_ptr);
protocols::RecordsWithErrorCount<TRecordType> result;

// If this protocol doesn't support streams, we call StitchFrames with just the deque.
// If it does, we use a map of stream ID to deque.
// TODO(@benkilimnik): Eventually, we should migrate all of the protocols to use the map.
if constexpr (TProtocolTraits::stream_support ==
protocols::BaseProtocolTraits<TRecordType>::UseStream) {
using TKey = typename TProtocolTraits::key_type;
absl::flat_hash_map<TKey, std::deque<TFrameType>> requests;
absl::flat_hash_map<TKey, std::deque<TFrameType>> responses;
// TODO(@benkilimnik): Hard code the stream for now. Populate the map in a future PR.
requests[0] = std::move(req_frames);
responses[0] = std::move(resp_frames);
result = protocols::StitchFrames<TRecordType, TKey, TFrameType, TStateType>(
&requests, &responses, state_ptr);
// TODO(@benkilimnik): Update req and resp frame deques to match maps for now. Populate maps
// during parsing in a future PR.
req_frames.clear();
for (auto& [_, frames] : requests) {
for (auto& frame : frames) {
req_frames.push_back(std::move(frame));
}
}
resp_frames.clear();
for (auto& [_, frames] : responses) {
for (auto& frame : frames) {
resp_frames.push_back(std::move(frame));
}
}
} else {
result = protocols::StitchFrames<TRecordType, TFrameType, TStateType>(
&req_frames, &resp_frames, state_ptr);
}

CONN_TRACE(2) << absl::Substitute("records=$0", result.records.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <absl/container/flat_hash_map.h>
#include <deque>
#include <variant>
#include <vector>
Expand Down Expand Up @@ -111,6 +112,18 @@ template <typename TFrameType, typename TStateType = NoState>
ParseState ParseFrame(message_type_t type, std::string_view* buf, TFrameType* frame,
TStateType* state = nullptr);

/**
* Returns the stream ID of the given frame.
*
* @tparam TFrameType Type of frame to parse.
* @param frame The frame to get the stream ID from.
* @return The stream ID of the given frame.
*/
template <typename TKey, typename TFrameType>
TKey GetStreamID(TFrameType*) {
return TKey(0);
}

/**
* StitchFrames is the entry point of stitcher for all protocols. It loops through the responses,
* matches them with the corresponding requests, and returns stitched request & response pairs.
Expand All @@ -124,6 +137,18 @@ RecordsWithErrorCount<TRecordType> StitchFrames(std::deque<TFrameType>* requests
std::deque<TFrameType>* responses,
TStateType* state);

/**
* For protocols that support streams, we use a map of stream ID to frames.
*
* @param requests: map of stream ID to deque of request frames.
* @param responses: map of stream ID to deque of response frames.
* @return A vector of entries to be appended to table store.
*/
template <typename TRecordType, typename TKey, typename TFrameType, typename TStateType>
RecordsWithErrorCount<TRecordType> StitchFrames(
absl::flat_hash_map<TKey, std::deque<TFrameType>>* requests,
absl::flat_hash_map<TKey, std::deque<TFrameType>>* responses, TStateType* state);

/**
* The BaseProtocolTraits all ProtocolTraits should inherit from. It provides a default
* UpdateTimestamps method that applies to most protocols.
Expand All @@ -136,6 +161,10 @@ struct BaseProtocolTraits {
record->req.timestamp_ns = func(record->req.timestamp_ns);
record->resp.timestamp_ns = func(record->resp.timestamp_ns);
}
enum StreamSupport { NoStream, UseStream };
// Protocol does not support streams by default. Override this in the derived ProtocolTraits to
// parse frames into map of stream ID to frames instead of a single deque for all streams.
static constexpr StreamSupport stream_support = NoStream;
};

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ size_t FindFrameBoundary<cass::Frame>(message_type_t /*type*/, std::string_view
return std::string::npos;
}

template <>
cass::stream_id_t GetStreamID(cass::Frame* frame) {
return frame->hdr.stream;
}

} // namespace protocols
} // namespace stirling
} // namespace px
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ template <>
size_t FindFrameBoundary<cass::Frame>(message_type_t type, std::string_view buf, size_t start_pos,
NoState* state);

template <>
cass::stream_id_t GetStreamID(cass::Frame* frame);

} // namespace protocols
} // namespace stirling
} // namespace px
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <absl/container/flat_hash_map.h>
#include <deque>
#include <map>
#include <string>
Expand Down Expand Up @@ -45,10 +46,11 @@ RecordsWithErrorCount<Record> StitchFrames(std::deque<Frame>* req_frames,
} // namespace cass

template <>
inline RecordsWithErrorCount<cass::Record> StitchFrames(std::deque<cass::Frame>* req_frames,
std::deque<cass::Frame>* resp_frames,
NoState* /* state */) {
return cass::StitchFrames(req_frames, resp_frames);
inline RecordsWithErrorCount<cass::Record> StitchFrames(
absl::flat_hash_map<cass::stream_id_t, std::deque<cass::Frame>>* req_messages,
absl::flat_hash_map<cass::stream_id_t, std::deque<cass::Frame>>* res_messages,
NoState* /* state */) {
return cass::StitchFrames(&((*req_messages)[0]), &((*res_messages)[0]));
}

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ inline bool IsRespOpcode(Opcode opcode) {
return resp_opcode.has_value();
}

using stream_id_t = uint16_t;
struct FrameHeader {
// Top bit is direction.
uint8_t version;
uint8_t flags;
uint16_t stream;
stream_id_t stream;
Opcode opcode;
int32_t length;
};
Expand Down Expand Up @@ -171,6 +172,8 @@ struct ProtocolTraits : public BaseProtocolTraits<Record> {
using frame_type = Frame;
using record_type = Record;
using state_type = NoState;
using key_type = stream_id_t;
static constexpr StreamSupport stream_support = BaseProtocolTraits<Record>::UseStream;
};

} // namespace cass
Expand Down

0 comments on commit e670eb6

Please sign in to comment.