Skip to content

Commit

Permalink
dekaf: Swap to sending control messages that count for consumer offsets
Browse files Browse the repository at this point in the history
We were translating acks into entirely invalid Kafka control messages in order to still send them and have consumers know about them, but filter them out from the documents a client sees. It turns out this translation was a bit too invalid, and the messages were getting filtered before their offsets were being counted for consumer group committed offsets.

This changes the translation to use a valid control message version (0), but an invalid message _type_, which has the effect of causing the message to be a no-op, but still get counted for offset tracking.

Relevant librdkafka section here: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L768-L903
  • Loading branch information
jshearer committed Oct 22, 2024
1 parent ac2fb0d commit 86ef481
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,20 @@ impl Read {
// ControlMessageKey => Version ControlMessageType
// Version => int16
// ControlMessageType => int16
// Skip control messages with version != 0:
// if (ctrl_data.Version != 0) {
// rd_kafka_buf_skip_to(rkbuf, message_end);
// return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */
// }
// Control messages with version > 0 are entirely ignored:
// https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L777-L824
// But, we don't want our message to be entirely ignored,
// we just don't want it to be returned to the client.
// If we send a valid version 0 control message, with an
// invalid message type (not 0 or 1), that should do what we want:
// https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L882-L902

// Control Message keys are always 4 bytes:
// Version: Any value != 0: i16
buf.put_i16(9999);
// ControlMessageType: unused: i16
buf.put_i16(9999);
record_bytes += 4;
// Version: 0i16
buf.put_i16(0);
// ControlMessageType: >1 i16
buf.put_i16(2);
record_bytes += buf.len();
Some(buf.split().freeze())
} else {
tmp.push(0);
Expand Down

0 comments on commit 86ef481

Please sign in to comment.