Skip to content

Commit

Permalink
Fix parsing metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 7, 2024
1 parent 17bf86c commit f73df6c
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.protocol.Commands;
import org.jetbrains.annotations.NotNull;

@Slf4j
public class JMSPublishFilters implements BrokerInterceptor {
Expand Down Expand Up @@ -284,8 +284,7 @@ private void filterAndAckMessage(
// this operation has been enqueued before the broker shutdown
return;
}
MessageMetadata messageMetadata = new MessageMetadata();
Commands.parseMessageMetadata(messageMetadataUnparsed, messageMetadata);
MessageMetadata messageMetadata = getMessageMetadata(messageMetadataUnparsed);
long now = System.nanoTime();
try {
FilterContext filterContext = new FilterContext();
Expand Down Expand Up @@ -317,6 +316,17 @@ private void filterAndAckMessage(
}
}

@NotNull
private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) {
MessageMetadata messageMetadata = new MessageMetadata();
synchronized (messageMetadataUnparsed) {
int index = messageMetadataUnparsed.readerIndex();
messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes());
messageMetadataUnparsed.readerIndex(index);
}
return messageMetadata;
}

private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) {
try {
Dispatcher dispatcher = subscription.getDispatcher();
Expand Down

0 comments on commit f73df6c

Please sign in to comment.