Skip to content

Commit

Permalink
parse payload out of the ByteBuf
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed May 3, 2024
1 parent 8e8c866 commit f16ac60
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,11 @@ public void onMessagePublish(
Map<String, Object> traceDetails = new TreeMap<>();
traceDetails.put("producer", getProducerDetails(producer, traceSchema));
traceDetails.put("publishContext", getPublishContextDetails(publishContext));
traceByteBuf("headersAndPayload", headersAndPayload, traceDetails, maxBinaryDataLength);

Map<String, Object> headersAndPayloadDetails = new TreeMap<>();
traceByteBuf(
"headersAndPayload", headersAndPayload, headersAndPayloadDetails, maxBinaryDataLength);
traceDetails.put("headersAndPayload", headersAndPayloadDetails);

trace(EventReasons.MESSAGE, "Message publish", traceDetails);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -45,7 +46,10 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;

@Slf4j
Expand Down Expand Up @@ -709,12 +713,45 @@ private static void populatePublishContext(
public static void traceByteBuf(
String key, ByteBuf buf, Map<String, Object> traceDetails, int maxBinaryDataLength) {
if (buf == null || maxBinaryDataLength <= 0) return;
try {

if (buf.readableBytes() < maxBinaryDataLength) {
traceDetails.put(key, "0x" + Hex.encodeHexString(buf.nioBuffer()));
} else {
traceDetails.put(
key + "Slice", "0x" + Hex.encodeHexString(buf.slice(0, maxBinaryDataLength).nioBuffer()));
final ByteBuf metadataAndPayload = buf.retainedDuplicate();
ByteBuf uncompressedPayload = null;
try {
// advance readerIndex
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

// todo: do we need to trace this metadata?
populateMessageMetadataDetails(metadata, traceDetails);

// Decode if needed
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// todo: does this require additional steps if messages are batched?
if (uncompressedPayload.readableBytes() < maxBinaryDataLength + 3) {
String dataAsString = uncompressedPayload.toString(StandardCharsets.UTF_8);
traceDetails.put(key, dataAsString);
} else {
String dataAsString =
uncompressedPayload.toString(0, maxBinaryDataLength, StandardCharsets.UTF_8);
traceDetails.put(key, dataAsString + "...");
}
} finally {
metadataAndPayload.release();
if (uncompressedPayload != null) {
uncompressedPayload.release();
}
}
} catch (Throwable t) {
log.error("Failed to convert ByteBuf to string", t);
if (buf.readableBytes() < maxBinaryDataLength + 3) {
traceDetails.put(key, "0x" + Hex.encodeHexString(buf.nioBuffer()));
} else {
traceDetails.put(
key, "0x" + Hex.encodeHexString(buf.slice(0, maxBinaryDataLength).nioBuffer()) + "...");
}
}
}
}

0 comments on commit f16ac60

Please sign in to comment.