Skip to content

Commit

Permalink
Publish Filters: do not block the pulsar-io thread and other performa…
Browse files Browse the repository at this point in the history
…nce improvements (#145)
  • Loading branch information
eolivelli authored May 13, 2024
1 parent 91110b8 commit 512d269
Show file tree
Hide file tree
Showing 7 changed files with 511 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public JMSFilter() {
public FilterResult filterEntry(Entry entry, FilterContext context) {
long start = System.nanoTime();
try {
return filterEntry(entry, context, false);
return filterEntry(entry, context, false, null);
} finally {
filterProcessingTime
.labels(context.getSubscription().getTopicName(), context.getSubscription().getName())
Expand Down Expand Up @@ -206,7 +206,11 @@ private boolean isHandleOnlySelectors(FilterContext context) {
return handleOnlySelectors;
}

public FilterResult filterEntry(Entry entry, FilterContext context, boolean onMessagePublish) {
public FilterResult filterEntry(
Entry entry,
FilterContext context,
boolean onMessagePublish,
MessageMetadataCache messageMetadataCache) {
Consumer consumer = context.getConsumer();
Map<String, String> consumerMetadata =
consumer != null ? consumer.getMetadata() : Collections.emptyMap();
Expand Down Expand Up @@ -279,7 +283,8 @@ public FilterResult filterEntry(Entry entry, FilterContext context, boolean onMe
selectorOnSubscription,
selector,
subscription,
consumerMetadata);
consumerMetadata,
messageMetadataCache);
}
} catch (Throwable err) {
log.error("Error while processing entry " + err, err);
Expand Down Expand Up @@ -325,13 +330,19 @@ private FilterResult processSingleMessageEntry(
SelectorSupport selectorOnSubscription,
SelectorSupport selector,
Subscription subscription,
Map<String, String> consumerMetadata)
Map<String, String> consumerMetadata,
MessageMetadataCache messageMetadataCache)
throws JMSException {
// here we are dealing with a single message,
// so we can reject the message more easily
PropertyEvaluator typedProperties =
new PropertyEvaluator(
metadata.getPropertiesCount(), metadata.getPropertiesList(), null, metadata, context);
metadata.getPropertiesCount(),
metadata.getPropertiesList(),
null,
metadata,
context,
messageMetadataCache);

if (selectorOnSubscription != null) {
boolean matchesSubscriptionFilter = matches(typedProperties, selectorOnSubscription);
Expand Down Expand Up @@ -414,7 +425,8 @@ private FilterResult processBatchEntry(
singleMessageMetadata.getPropertiesList(),
singleMessageMetadata,
null,
context);
context,
null);

// noLocal filter
// all the messages in the batch come from the Producer/Connection
Expand Down Expand Up @@ -545,8 +557,13 @@ private static class PropertyEvaluator implements Function<String, Object> {
private SingleMessageMetadata singleMessageMetadata;
private MessageMetadata metadata;
private FilterContext context;
private MessageMetadataCache messageMetadataCache;

private Object getProperty(String name) {
if (messageMetadataCache != null) {
return messageMetadataCache.getProperty(
name, n -> JMSFilter.getProperty(propertiesCount, propertiesList, n));
}
return JMSFilter.getProperty(propertiesCount, propertiesList, name);
}

Expand Down
Loading

0 comments on commit 512d269

Please sign in to comment.