From 86ef4817e4f10d27cb70cee5aa818cf167c16d3e Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Tue, 22 Oct 2024 12:54:56 -0400 Subject: [PATCH] dekaf: Swap to sending control messages that count for consumer offsets We were translating acks into entirely invalid Kafka control messages in order to still send them and have consumers know about them, but filter them out from the documents a client sees. It turns out this translation was a bit too invalid, and the messages were getting filtered before their offsets were being counted for consumer group committed offsets. This changes the translation to use a valid control message version (0), but an invalid message _type_, which has the effect of causing the message to be a no-op, but still get counted for offset tracking. Relevant librdkafka section here: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L768-L903 --- crates/dekaf/src/read.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 8f2c1f679b..4a61e2363a 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -218,19 +218,20 @@ impl Read { // ControlMessageKey => Version ControlMessageType // Version => int16 // ControlMessageType => int16 - // Skip control messages with version != 0: - // if (ctrl_data.Version != 0) { - // rd_kafka_buf_skip_to(rkbuf, message_end); - // return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */ - // } + // Control messages with version > 0 are entirely ignored: // https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L777-L824 + // But, we don't want our message to be entirely ignored, + // we just don't want it to be returned to the client. + // If we send a valid version 0 control message, with an + // invalid message type (not 0 or 1), that should do what we want: + // https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L882-L902 // Control Message keys are always 4 bytes: - // Version: Any value != 0: i16 - buf.put_i16(9999); - // ControlMessageType: unused: i16 - buf.put_i16(9999); - record_bytes += 4; + // Version: 0i16 + buf.put_i16(0); + // ControlMessageType: >1 i16 + buf.put_i16(2); + record_bytes += buf.len(); Some(buf.split().freeze()) } else { tmp.push(0);