From 4323109a649ff8227ea777facd01332558cb65fe Mon Sep 17 00:00:00 2001 From: kecona Date: Wed, 27 Sep 2023 11:45:25 +0800 Subject: [PATCH] [fix][broker] Miss headersAndPayload and messageIdData in MessagePublishContext --- .../pulsar/broker/service/Producer.java | 52 +++++++++++++------ .../pulsar/broker/service/ServerCnx.java | 6 ++- .../apache/pulsar/broker/service/Topic.java | 8 +++ 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 6ad07a70a37964..19ed05f63824d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -184,14 +184,15 @@ public boolean isSuccessorTo(Producer other) { } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, - boolean isChunked, boolean isMarker) { + boolean isChunked, boolean isMarker, MessageIdData messageIdData) { if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) { - publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker); + publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, messageIdData); } } public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, - ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { + ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, + MessageIdData messageIdData) { if (lowestSequenceId > highestSequenceId) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError, @@ -202,7 +203,7 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS } if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) { publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked, - isMarker); + isMarker, messageIdData); } } @@ -248,19 +249,20 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, - boolean isMarker) { + boolean isMarker, MessageIdData messageIdData) { topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, sequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker)); + headersAndPayload, batchSize, + isChunked, System.nanoTime(), isMarker, messageIdData)); } private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, - long batchSize, boolean isChunked, boolean isMarker) { + long batchSize, boolean isChunked, boolean isMarker, + MessageIdData messageIdData) { topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker)); + highestSequenceId, msgIn, headersAndPayload, batchSize, + isChunked, System.nanoTime(), isMarker, messageIdData)); } private boolean verifyChecksum(ByteBuf headersAndPayload) { @@ -357,6 +359,9 @@ private static final class MessagePublishContext implements PublishContext, Runn private long highestSequenceId; private long originalHighestSequenceId; + private MessageIdData messageIdData; + private ByteBuf headerAndPayload; + public String getProducerName() { return producer.getProducerName(); } @@ -493,19 +498,21 @@ public void run() { recycle(); } - static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, - long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { + static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, ByteBuf headersAndPayload, + long batchSize, boolean chunked, long startTimeNs, boolean isMarker, MessageIdData messageIdData) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; callback.rateIn = rateIn; - callback.msgSize = msgSize; + callback.msgSize = headersAndPayload.readableBytes(); callback.batchSize = batchSize; callback.chunked = chunked; callback.originalProducerName = null; callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; + callback.headerAndPayload = headersAndPayload; + callback.messageIdData = messageIdData; if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -513,19 +520,22 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { + ByteBuf headersAndPayload, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, + MessageIdData messageIdData) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; callback.highestSequenceId = highestSequenceId; callback.rateIn = rateIn; - callback.msgSize = msgSize; + callback.msgSize = headersAndPayload.readableBytes(); callback.batchSize = batchSize; callback.originalProducerName = null; callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; + callback.headerAndPayload = headersAndPayload; + callback.messageIdData = messageIdData; if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -542,6 +552,16 @@ public boolean isMarkerMessage() { return isMarker; } + @Override + public MessageIdData getMessageIdData() { + return messageIdData; + } + + @Override + public ByteBuf getHeaderAndPayload() { + return headerAndPayload; + } + private final Handle recyclerHandle; private MessagePublishContext(Handle recyclerHandle) { @@ -568,6 +588,8 @@ public void recycle() { startTimeNs = -1L; chunked = false; isMarker = false; + messageIdData = null; + headerAndPayload = null; if (propertyMap != null) { propertyMap.clear(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index fe43a0a147b8bb..db617eb4144227 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1543,13 +1543,15 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { return; } + MessageIdData messageIdData = send.hasMessageId() ? send.getMessageId() : null; + // Persist the message if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), - headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker()); + headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), messageIdData); } else { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, - send.getNumMessages(), send.isIsChunk(), send.isMarker()); + send.getNumMessages(), send.isIsChunk(), send.isMarker(), messageIdData); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index e2ffb41390a7e4..6b54d333c34ac5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -112,6 +112,14 @@ default Object getProperty(String propertyName) { default boolean isChunked() { return false; } + + default MessageIdData getMessageIdData() { + return null; + } + + default ByteBuf getHeaderAndPayload() { + return null; + } } CompletableFuture initialize();