From 07bd620e9a30b66ddcb54f540be3ad7f18cf71ff Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 9 Oct 2024 18:19:42 +0300 Subject: [PATCH 01/15] [improve][broker][PIP-379] Add observability stats for "draining hashes" --- .../pulsar/broker/service/Consumer.java | 7 ++ .../broker/service/DrainingHashesTracker.java | 83 +++++++++++++++++++ ...tStickyKeyDispatcherMultipleConsumers.java | 2 + .../persistent/PersistentSubscription.java | 3 + .../common/policies/data/ConsumerStats.java | 32 +++++++ .../policies/data/SubscriptionStats.java | 24 ++++++ .../data/stats/ConsumerStatsImpl.java | 26 ++++++ .../data/stats/SubscriptionStatsImpl.java | 22 +++++ 8 files changed, 199 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index d25ebd0839df1..bcd29d86490cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -174,6 +174,10 @@ public class Consumer { @Setter private volatile PendingAcksMap.PendingAcksRemoveHandler pendingAcksRemoveHandler; + @Getter + @Setter + private volatile java.util.function.BiConsumer drainingHashesConsumerStatsUpdater; + public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, boolean isDurable, TransportCnx cnx, String appId, @@ -976,6 +980,9 @@ public ConsumerStatsImpl getStats() { if (readPositionWhenJoining != null) { stats.readPositionWhenJoining = readPositionWhenJoining.toString(); } + if (drainingHashesConsumerStatsUpdater != null) { + drainingHashesConsumerStatsUpdater.accept(this, stats); + } return stats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 3521fa197a13d..23f2c9deb1152 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -19,9 +19,16 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import java.util.Map; +import java.util.PrimitiveIterator; +import java.util.concurrent.ConcurrentHashMap; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.roaringbitmap.RoaringBitmap; /** * A thread-safe map to store draining hashes in the consumer. @@ -34,6 +41,8 @@ public class DrainingHashesTracker { private final Int2ObjectOpenHashMap drainingHashes = new Int2ObjectOpenHashMap<>(); int batchLevel; boolean unblockedWhileBatching; + private final Map consumerDrainingHashesStatsMap = + new ConcurrentHashMap<>(); /** * Represents an entry in the draining hashes tracker. @@ -98,6 +107,45 @@ boolean isBlocking() { } } + private class ConsumerDrainingHashesStats { + private final RoaringBitmap drainingHashes = new RoaringBitmap(); + long drainingHashesClearedTotal; + + public synchronized void addHash(int stickyHash) { + drainingHashes.add(stickyHash); + } + + public synchronized boolean clearHash(int hash) { + drainingHashes.remove(hash); + drainingHashesClearedTotal++; + return drainingHashes.isEmpty(); + } + + public synchronized void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { + int drainingHashesCount = 0; + int drainingHashesUnackedMessages = 0; + Int2IntMap drainingHashesUnackedMessagesByHash = new Int2IntOpenHashMap(); + PrimitiveIterator.OfInt hashIterator = drainingHashes.stream().iterator(); + while (hashIterator.hasNext()) { + int hash = hashIterator.nextInt(); + DrainingHashEntry entry = getEntry(hash); + if (entry == null) { + log.warn("[{}] Draining hash {} not found in the tracker for consumer {}", dispatcherName, hash, + consumer); + continue; + } + int unackedMessage = entry.refCount; + drainingHashesUnackedMessagesByHash.put(hash, unackedMessage); + drainingHashesUnackedMessages += unackedMessage; + drainingHashesCount++; + } + consumerStats.drainingHashesCount = drainingHashesCount; + consumerStats.drainingHashesClearedTotal = drainingHashesClearedTotal; + consumerStats.drainingHashesUnackedMessages = drainingHashesUnackedMessages; + consumerStats.drainingHashesUnackedMessagesByHash = drainingHashesUnackedMessagesByHash; + } + } + /** * Interface for handling the unblocking of sticky key hashes. */ @@ -129,6 +177,9 @@ public synchronized void addEntry(Consumer consumer, int stickyHash) { if (entry == null) { entry = new DrainingHashEntry(consumer); drainingHashes.put(stickyHash, entry); + // update the consumer specific stats + consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer), + k -> new ConsumerDrainingHashesStats()).addHash(stickyHash); } else if (entry.getConsumer() != consumer) { throw new IllegalStateException( "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash @@ -179,6 +230,15 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole } if (entry.decrementRefCount()) { DrainingHashEntry removed = drainingHashes.remove(stickyHash); + // update the consumer specific stats + consumerDrainingHashesStatsMap.compute(new ConsumerIdentityWrapper(consumer), + (key, consumerDrainingHashesStats) -> { + if (consumerDrainingHashesStats != null && consumerDrainingHashesStats.clearHash(stickyHash)) { + // remove the consumer specific stats if all hashes are cleared + return null; + } + return consumerDrainingHashesStats; + }); if (!closing && removed.isBlocking()) { if (batchLevel > 0) { unblockedWhileBatching = true; @@ -237,5 +297,28 @@ public synchronized DrainingHashEntry getEntry(int stickyKeyHash) { */ public synchronized void clear() { drainingHashes.clear(); + consumerDrainingHashesStatsMap.clear(); + } + + /** + * Update the consumer specific stats to the target {@link ConsumerStatsImpl}. + * + * @param consumer the consumer + * @param consumerStats the consumer stats to update the values to + */ + public void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { + ConsumerDrainingHashesStats consumerDrainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (consumerDrainingHashesStats != null) { + consumerDrainingHashesStats.updateConsumerStats(consumer, consumerStats); + } + } + + /** + * Remove the consumer specific stats from the draining hashes tracker. + * @param consumer the consumer + */ + public void consumerRemoved(Consumer consumer) { + consumerDrainingHashesStatsMap.remove(new ConsumerIdentityWrapper(consumer)); } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index df053e6d8a549..4e61a2c0ed38e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -157,6 +157,7 @@ public void endBatch() { drainingHashesTracker.endBatch(); } }); + consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats); registerDrainingHashes(consumer, impactedConsumers.orElseThrow()); } }).exceptionally(ex -> { @@ -193,6 +194,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE // consumer to another. This will handle the case where a hash gets switched from an existing // consumer to another existing consumer during removal. registerDrainingHashes(consumer, impactedConsumers.orElseThrow()); + drainingHashesTracker.consumerRemoved(consumer); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index eaa147b81b126..0aeabde59c07a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1258,6 +1258,9 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge .map(Range::toString) .collect(Collectors.toList()); } + subStats.drainingHashesCount += consumerStats.drainingHashesCount; + subStats.drainingHashesClearedTotal += consumerStats.drainingHashesClearedTotal; + subStats.drainingHashesUnackedMessages += consumerStats.drainingHashesUnackedMessages; }); subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index d2d3600df96ed..a472e84d84f0c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -75,6 +75,38 @@ public interface ConsumerStats { /** The read position of the cursor when the consumer joining. */ String getReadPositionWhenJoining(); + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the current number of hashes in the draining state for this consumer. + * + * @return the current number of hashes in the draining state for this consumer + */ + int getDrainingHashesCount(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of hashes cleared from the draining state since the consumer connected. + * + * @return the total number of hashes cleared from the draining state since the consumer connected + */ + long getDrainingHashesClearedTotal(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of unacked messages for all draining hashes for this consumer. + * + * @return the total number of unacked messages for all draining hashes for this consumer + */ + int getDrainingHashesUnackedMessages(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the number of unacked messages grouped by their hash values. + * + * @return a map where the key is the hash value and the value is the number of unacked messages for that hash + */ + Map getDrainingHashesUnackedMessagesByHash(); + /** Address of this consumer. */ String getAddress(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index ce3a080a855da..95e7c65266bff 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -121,6 +121,30 @@ public interface SubscriptionStats { /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ Map getConsumersAfterMarkDeletePosition(); + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the current number of hashes in the draining state. + * + * @return the current number of hashes in the draining state + */ + int getDrainingHashesCount(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of hashes cleared from the draining state for the connected consumers. + * + * @return the total number of hashes cleared from the draining state for the connected consumers + */ + long getDrainingHashesClearedTotal(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of unacked messages for all draining hashes. + * + * @return the total number of unacked messages for all draining hashes + */ + int getDrainingHashesUnackedMessages(); + /** SubscriptionProperties (key/value strings) associated with this subscribe. */ Map getSubscriptionProperties(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index de36b330b7f1a..9c103978afed5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -80,6 +80,28 @@ public class ConsumerStatsImpl implements ConsumerStats { /** The read position of the cursor when the consumer joining. */ public String readPositionWhenJoining; + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of hashes in the draining state. + */ + public int drainingHashesCount; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of hashes cleared from the draining state for + * the consumer. + */ + public long drainingHashesClearedTotal; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of unacked messages for all draining hashes. + */ + public int drainingHashesUnackedMessages; + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * a map where the key is the hash value and the value is the number of unacked messages for that hash. + */ + public Map drainingHashesUnackedMessagesByHash; + /** Address of this consumer. */ private String address; /** Timestamp of connection. */ @@ -114,6 +136,10 @@ public ConsumerStatsImpl add(ConsumerStatsImpl stats) { this.unackedMessages += stats.unackedMessages; this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs; this.readPositionWhenJoining = stats.readPositionWhenJoining; + this.drainingHashesCount = stats.drainingHashesCount; + this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal; + this.drainingHashesUnackedMessages = stats.drainingHashesUnackedMessages; + this.drainingHashesUnackedMessagesByHash = stats.drainingHashesUnackedMessagesByHash; return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index 12734a5586cef..02df9b7870023 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -129,6 +129,22 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ public Map consumersAfterMarkDeletePosition; + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of hashes in the draining state. + */ + public int drainingHashesCount; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of hashes cleared from the draining state + * for the connected consumers. + */ + public long drainingHashesClearedTotal; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of unacked messages for all draining hashes. + */ + public int drainingHashesUnackedMessages; + /** The number of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRanges; @@ -180,6 +196,9 @@ public void reset() { lastMarkDeleteAdvancedTimestamp = 0L; consumers.clear(); consumersAfterMarkDeletePosition.clear(); + drainingHashesCount = 0; + drainingHashesClearedTotal = 0L; + drainingHashesUnackedMessages = 0; nonContiguousDeletedMessagesRanges = 0; nonContiguousDeletedMessagesRangesSerializedSize = 0; earliestMsgPublishTimeInBacklog = 0L; @@ -226,6 +245,9 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { } this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery; this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition); + this.drainingHashesCount += stats.drainingHashesCount; + this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal; + this.drainingHashesUnackedMessages += stats.drainingHashesUnackedMessages; this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize; if (this.earliestMsgPublishTimeInBacklog != 0 && stats.earliestMsgPublishTimeInBacklog != 0) { From 16792f0de63e427de938997d6ecca011acd995be Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 00:01:51 +0300 Subject: [PATCH 02/15] Use keyHashRanges for classic dispatcher and add keyHashRangeArrays with array format The String format is very inefficient. It's better to replace it for PIP-379 This is needed for tests --- .../persistent/PersistentSubscription.java | 17 +++++++++++++---- .../common/policies/data/ConsumerStats.java | 15 +++++++++++++-- .../policies/data/stats/ConsumerStatsImpl.java | 14 +++++++++++++- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 0aeabde59c07a..df1c23cbbcb30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1253,10 +1253,19 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp); subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp); - if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) { - consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer).stream() - .map(Range::toString) - .collect(Collectors.toList()); + List keyRanges = consumerKeyHashRanges != null ? consumerKeyHashRanges.get(consumer) : null; + if (keyRanges != null) { + if (((StickyKeyDispatcher) dispatcher).isClassic()) { + // Use string representation for classic mode + consumerStats.keyHashRanges = keyRanges.stream() + .map(Range::toString) + .collect(Collectors.toList()); + } else { + // Use array representation for PIP-379 stats + consumerStats.keyHashRangeArrays = keyRanges.stream() + .map(range -> new int[]{range.getStart(), range.getEnd()}) + .collect(Collectors.toList()); + } } subStats.drainingHashesCount += consumerStats.drainingHashesCount; subStats.drainingHashesClearedTotal += consumerStats.drainingHashesClearedTotal; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index a472e84d84f0c..217763b960eb2 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -120,9 +120,20 @@ public interface ConsumerStats { long getLastConsumedTimestamp(); long getLastConsumedFlowTimestamp(); - /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/ + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `false` + * (default). + */ + List getKeyHashRangeArrays(); + + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`. + */ + @Deprecated List getKeyHashRanges(); /** Metadata (key/value strings) associated with this consumer. */ Map getMetadata(); -} +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index 9c103978afed5..77014db24ab32 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -118,7 +118,17 @@ public class ConsumerStatsImpl implements ConsumerStats { public long lastConsumedFlowTimestamp; - /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/ + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `false` + * (default). + */ + public List keyHashRangeArrays; + + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`. + */ public List keyHashRanges; /** Metadata (key/value strings) associated with this consumer. */ @@ -140,6 +150,8 @@ public ConsumerStatsImpl add(ConsumerStatsImpl stats) { this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal; this.drainingHashesUnackedMessages = stats.drainingHashesUnackedMessages; this.drainingHashesUnackedMessagesByHash = stats.drainingHashesUnackedMessagesByHash; + this.keyHashRanges = stats.keyHashRanges; + this.keyHashRangeArrays = stats.keyHashRangeArrays; return this; } From cd0be0ac57b22329fef6e799986a439520017f22 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 00:48:18 +0300 Subject: [PATCH 03/15] Add DrainingHash for stats --- .../broker/service/DrainingHashesTracker.java | 24 ++++++---- .../common/policies/data/ConsumerStats.java | 6 +-- .../common/policies/data/DrainingHash.java | 41 +++++++++++++++++ .../data/stats/ConsumerStatsImpl.java | 9 ++-- .../policies/data/stats/DrainingHashImpl.java | 46 +++++++++++++++++++ .../common/util/ObjectMapperFactory.java | 3 ++ 6 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 23f2c9deb1152..2b2c1770b8a8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -19,15 +19,17 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; -import it.unimi.dsi.fastutil.ints.Int2IntMap; -import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.PrimitiveIterator; import java.util.concurrent.ConcurrentHashMap; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.policies.data.DrainingHash; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl; import org.roaringbitmap.RoaringBitmap; /** @@ -122,9 +124,8 @@ public synchronized boolean clearHash(int hash) { } public synchronized void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { - int drainingHashesCount = 0; int drainingHashesUnackedMessages = 0; - Int2IntMap drainingHashesUnackedMessagesByHash = new Int2IntOpenHashMap(); + List drainingHashesStats = new ArrayList<>(); PrimitiveIterator.OfInt hashIterator = drainingHashes.stream().iterator(); while (hashIterator.hasNext()) { int hash = hashIterator.nextInt(); @@ -134,15 +135,18 @@ public synchronized void updateConsumerStats(Consumer consumer, ConsumerStatsImp consumer); continue; } - int unackedMessage = entry.refCount; - drainingHashesUnackedMessagesByHash.put(hash, unackedMessage); - drainingHashesUnackedMessages += unackedMessage; - drainingHashesCount++; + int unackedMessages = entry.refCount; + DrainingHashImpl drainingHash = new DrainingHashImpl(); + drainingHash.hash = hash; + drainingHash.unackMsgs = unackedMessages; + drainingHash.blockedAttempts = entry.blockedCount; + drainingHashesStats.add(drainingHash); + drainingHashesUnackedMessages += unackedMessages; } - consumerStats.drainingHashesCount = drainingHashesCount; + consumerStats.drainingHashesCount = drainingHashesStats.size(); consumerStats.drainingHashesClearedTotal = drainingHashesClearedTotal; consumerStats.drainingHashesUnackedMessages = drainingHashesUnackedMessages; - consumerStats.drainingHashesUnackedMessagesByHash = drainingHashesUnackedMessagesByHash; + consumerStats.drainingHashes = drainingHashesStats; } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index 217763b960eb2..5b2f8adcd2608 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -101,11 +101,11 @@ public interface ConsumerStats { /** * For Key_Shared subscription in AUTO_SPLIT ordered mode: - * Retrieves the number of unacked messages grouped by their hash values. + * Retrieves the draining hashes for this consumer. * - * @return a map where the key is the hash value and the value is the number of unacked messages for that hash + * @return a list of draining hashes for this consumer */ - Map getDrainingHashesUnackedMessagesByHash(); + List getDrainingHashes(); /** Address of this consumer. */ String getAddress(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java new file mode 100644 index 0000000000000..685b0b74e64b9 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +/** + * Contains information about a draining hash in a Key_Shared subscription. + * @see ConsumerStats + */ +public interface DrainingHash { + /** + * Get the sticky key hash value of the draining hash. + * @return the sticky hash value + */ + int getHash(); + /** + * Get number of unacknowledged messages for the draining hash. + * @return number of unacknowledged messages + */ + int getUnackMsgs(); + /** + * Get the number of times the hash has blocked an attempted delivery of a message. + * @return number of times the hash has blocked an attempted delivery of a message + */ + int getBlockedAttempts(); +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index 77014db24ab32..8811247cb2de3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -23,6 +23,7 @@ import java.util.Objects; import lombok.Data; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.DrainingHash; import org.apache.pulsar.common.util.DateFormatter; /** @@ -98,9 +99,11 @@ public class ConsumerStatsImpl implements ConsumerStats { /** * For Key_Shared subscription in AUTO_SPLIT ordered mode: - * a map where the key is the hash value and the value is the number of unacked messages for that hash. + * Retrieves the draining hashes for this consumer. + * + * @return a list of draining hashes for this consumer */ - public Map drainingHashesUnackedMessagesByHash; + public List drainingHashes; /** Address of this consumer. */ private String address; @@ -149,7 +152,7 @@ public ConsumerStatsImpl add(ConsumerStatsImpl stats) { this.drainingHashesCount = stats.drainingHashesCount; this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal; this.drainingHashesUnackedMessages = stats.drainingHashesUnackedMessages; - this.drainingHashesUnackedMessagesByHash = stats.drainingHashesUnackedMessagesByHash; + this.drainingHashes = stats.drainingHashes; this.keyHashRanges = stats.keyHashRanges; this.keyHashRangeArrays = stats.keyHashRangeArrays; return this; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java new file mode 100644 index 0000000000000..134bdac597b7c --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data.stats; + +import lombok.Data; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.DrainingHash; + +/** + * Contains information about a draining hash in a Key_Shared subscription. + * @see ConsumerStats + */ +@Data +public class DrainingHashImpl implements DrainingHash { + /** + * Get the sticky key hash value of the draining hash. + * @return the sticky hash value + */ + public int hash; + /** + * Get number of unacknowledged messages for the draining hash. + * @return number of unacknowledged messages + */ + public int unackMsgs; + /** + * Get the number of times the hash has blocked an attempted delivery of a message. + * @return number of times the hash has blocked an attempted delivery of a message + */ + public int blockedAttempts; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java index 7b235cfa341d1..b737d68d5ea9f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.DrainingHash; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.FunctionInstanceStats; @@ -96,6 +97,7 @@ import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl; @@ -243,6 +245,7 @@ private static void setAnnotationsModule(ObjectMapper mapper) { resolver.addMapping(DispatchRate.class, DispatchRateImpl.class); resolver.addMapping(TopicStats.class, TopicStatsImpl.class); resolver.addMapping(ConsumerStats.class, ConsumerStatsImpl.class); + resolver.addMapping(DrainingHash.class, DrainingHashImpl.class); resolver.addMapping(NonPersistentPublisherStats.class, NonPersistentPublisherStatsImpl.class); resolver.addMapping(NonPersistentReplicatorStats.class, NonPersistentReplicatorStatsImpl.class); resolver.addMapping(NonPersistentSubscriptionStats.class, NonPersistentSubscriptionStatsImpl.class); From efc877bf0fd06d7304410f3ffdb5507efe713199 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 01:56:07 +0300 Subject: [PATCH 04/15] Add deprecated to getReadPositionWhenJoining --- .../org/apache/pulsar/common/policies/data/ConsumerStats.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index 5b2f8adcd2608..16dce5903f492 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -73,6 +73,7 @@ public interface ConsumerStats { boolean isBlockedConsumerOnUnackedMsgs(); /** The read position of the cursor when the consumer joining. */ + @Deprecated String getReadPositionWhenJoining(); /** From 71866834022360df48a777bfed217069180c6ed0 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 01:53:59 +0300 Subject: [PATCH 05/15] Add test utilities to log the topic stats in raw format --- .../apache/pulsar/broker/BrokerTestUtil.java | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index 7ed4542b2505f..6a41e86f8934e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -21,10 +21,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.StringWriter; import java.io.UncheckedIOException; +import java.net.HttpURLConnection; +import java.net.URL; import java.time.Duration; import java.util.Arrays; import java.util.UUID; @@ -37,6 +42,7 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Stream; +import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -46,7 +52,6 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.mockito.Mockito; import org.slf4j.Logger; - /** * Holds util methods used in test. */ @@ -136,6 +141,77 @@ public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String } } + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param baseUrl Pulsar service URL + * @param topic topic name + */ + public static void logTopicStats(Logger logger, String baseUrl, String topic) { + logTopicStats(logger, baseUrl, "public", "default", topic); + } + + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param baseUrl Pulsar service URL + * @param tenant tenant name + * @param namespace namespace name + * @param topic topic name + */ + public static void logTopicStats(Logger logger, String baseUrl, String tenant, String namespace, String topic) { + String topicStatsUri = + String.format("%s/admin/v2/persistent/%s/%s/%s/stats", baseUrl, tenant, namespace, topic); + logger.info("[{}] stats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsUri))); + String topicStatsInternalUri = + String.format("%s/admin/v2/persistent/%s/%s/%s/internalStats", baseUrl, tenant, namespace, topic); + logger.info("[{}] internalStats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsInternalUri))); + } + + /** + * Pretty print the given JSON string + * @param jsonString JSON string to pretty print + * @return pretty printed JSON string + */ + public static String jsonPrettyPrint(String jsonString) { + try { + ObjectMapper mapper = new ObjectMapper(); + Object json = mapper.readValue(jsonString, Object.class); + ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter(); + return writer.writeValueAsString(json); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Get the resource as a string from the given URI + */ + @SneakyThrows + public static String getJsonResourceAsString(String uri) { + URL url = new URL(uri); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Accept", "application/json"); + try { + int responseCode = connection.getResponseCode(); + if (responseCode == 200) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String inputLine; + StringBuilder content = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + return content.toString(); + } + } else { + throw new IOException("Failed to get resource: " + uri + ", status: " + responseCode); + } + } finally { + connection.disconnect(); + } + } + /** * Receive messages concurrently from multiple consumers and handles them using the provided message handler. * The message handler should return true if it wants to continue receiving more messages, false otherwise. From 1bda07e2821b01cc366d2cd12a2ff5c2612d5ac9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 9 Oct 2024 23:15:21 +0300 Subject: [PATCH 06/15] Add test for stats --- .../broker/stats/ConsumerStatsTest.java | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 5b2998216e8e1..5683ebda79cf2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -18,9 +18,11 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertNotEquals; import static org.testng.AssertJUnit.assertEquals; @@ -31,22 +33,29 @@ import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.PendingAcksMap; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.StickyKeyDispatcher; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -67,10 +76,14 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.DrainingHash; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.assertj.core.groups.Tuple; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -481,4 +494,155 @@ public void testNonPersistentTopicSharedSubscriptionUnackedMessages() throws Exc assertEquals(0, consumers.get(0).getUnackedMessages()); } + @Test + public void testKeySharedDrainingHashesConsumerStats() throws Exception { + conf.setSubscriptionKeySharedUseConsistentHashing(true); + conf.setSubscriptionKeySharedConsistentHashingReplicaPoints(100); + String topic = newUniqueName("testKeySharedDrainingHashesConsumerStats"); + String subscriptionName = "sub"; + int numberOfKeys = 10; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c1") + .receiverQueueSize(100) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + StickyKeyDispatcher dispatcher = getDispatcher(topic, subscriptionName); + StickyKeyConsumerSelector selector = dispatcher.getSelector(); + + for (int i = 0; i < 20; i++) { + String key = String.valueOf(i % numberOfKeys); + int stickyKeyHash = selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8)); + log.info("Sending message with value {} key {} hash {}", key, i, stickyKeyHash); + producer.newMessage() + .key(key) + .value(i) + .send(); + } + + PendingAcksMap c1PendingAcks = dispatcher.getConsumers().get(0).getPendingAcks(); + // Wait until all the already published messages have been pre-fetched by C1. + Awaitility.await().ignoreExceptions().until(() -> c1PendingAcks.size() == 20); + + // Adding a new consumer. + @Cleanup + Consumer c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + //BrokerTestUtil.logTopicStats(log, pulsar.getWebServiceAddress(), topic); + + SubscriptionStats subscriptionStats = + admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + ConsumerStats c1Stats = subscriptionStats.getConsumers().get(0); + ConsumerStats c2Stats = subscriptionStats.getConsumers().get(1); + + Set c2HashesByStats = new HashSet<>(); + Set c2HashesByDispatcher = new HashSet<>(); + + Map c1DrainingHashesExpected = new HashMap<>(); + for (int i = 0; i < 20; i++) { + String key = String.valueOf(i % numberOfKeys); + int hash = selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8)); + if ("c2".equals(findConsumerNameForHash(subscriptionStats, hash))) { + c2HashesByStats.add(hash); + } + org.apache.pulsar.broker.service.Consumer selected = selector.select(hash); + if ("c2".equals(selected.consumerName())) { + c2HashesByDispatcher.add(hash); + c1DrainingHashesExpected.compute(hash, (k, v) -> v == null ? 1 : v + 1); + } + } + assertThat(c2HashesByStats).containsExactlyInAnyOrderElementsOf(c2HashesByDispatcher); + + assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash) + .containsExactlyInAnyOrderElementsOf(c2HashesByStats); + assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash, DrainingHash::getUnackMsgs) + .containsExactlyInAnyOrderElementsOf(c1DrainingHashesExpected.entrySet().stream() + .map(e -> Tuple.tuple(e.getKey(), e.getValue())).toList()); + + assertThat(c2Stats.getDrainingHashes()).isNullOrEmpty(); + + for (int i = 0; i < 20; i++) { + producer.newMessage() + .key(String.valueOf(i % numberOfKeys)) + .value(i) + .send(); + } + + // validate blocked attempts + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + SubscriptionStats stats = + admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertThat(stats.getConsumers().get(0).getDrainingHashes()).isNotEmpty().allSatisfy(dh -> { + assertThat(dh).extracting(DrainingHash::getBlockedAttempts).matches(attempts -> attempts > 0); + }); + }); + + // acknowledging messages that were sent before c2 joined should clear all draining hashes + + // ack 19 messages + for (int i = 0; i < 20; i++) { + Message message = c1.receive(1, TimeUnit.SECONDS); + log.info("Acking message with value {} key {}", message.getValue(), message.getKey()); + c1.acknowledge(message); + if (i == 18) { + // now there should be one draining hash left + Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> { + SubscriptionStats stats = + admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> { + assertThat(consumerStats) + .describedAs("Consumer stats should have one draining hash %s", consumerStats) + .extracting(ConsumerStats::getDrainingHashes) + .asList().hasSize(1); + }); + }); + } + if (i == 19) { + // now there should be no draining hashes left + Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> { + SubscriptionStats stats = + admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> { + assertThat(consumerStats).extracting(ConsumerStats::getDrainingHashes) + .asList().isEmpty(); + }); + }); + } + } + + } + + private String findConsumerNameForHash(SubscriptionStats subscriptionStats, int hash) { + return findConsumerForHash(subscriptionStats, hash).map(ConsumerStats::getConsumerName).orElse(null); + } + + private Optional findConsumerForHash(SubscriptionStats subscriptionStats, int hash) { + return subscriptionStats.getConsumers().stream() + .filter(consumerStats -> consumerStats.getKeyHashRangeArrays().stream() + .anyMatch(hashRanges -> hashRanges[0] <= hash && hashRanges[1] >= hash)) + .findFirst(); + } + + @SneakyThrows + private StickyKeyDispatcher getDispatcher(String topic, String subscription) { + return (StickyKeyDispatcher) pulsar.getBrokerService().getTopicIfExists(topic).get() + .get().getSubscription(subscription).getDispatcher(); + } } From d5795d287faaa37bd9ab136c9192fb6792f7a492 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 02:41:33 +0300 Subject: [PATCH 07/15] Configure debug logging for PIP-379 core classes --- .../broker/service/DrainingHashesTracker.java | 29 ++++++++++++++++++- ...tStickyKeyDispatcherMultipleConsumers.java | 4 +-- .../persistent/RescheduleReadHandler.java | 14 +++++++++ pulsar-broker/src/test/resources/log4j2.xml | 11 +++++++ 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 2b2c1770b8a8d..3f8afeb82c657 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.PrimitiveIterator; @@ -120,7 +121,12 @@ public synchronized void addHash(int stickyHash) { public synchronized boolean clearHash(int hash) { drainingHashes.remove(hash); drainingHashesClearedTotal++; - return drainingHashes.isEmpty(); + boolean empty = drainingHashes.isEmpty(); + if (log.isDebugEnabled()) { + log.debug("[{}] Cleared hash {} in stats. empty={} totalCleared={} hashes={}", + dispatcherName, hash, empty, drainingHashesClearedTotal, drainingHashes.getCardinality()); + } + return empty; } public synchronized void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { @@ -179,6 +185,10 @@ public synchronized void addEntry(Consumer consumer, int stickyHash) { } DrainingHashEntry entry = drainingHashes.get(stickyHash); if (entry == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Adding and incrementing draining hash {} for consumer id:{} name:{}", dispatcherName, + stickyHash, consumer.consumerId(), consumer.consumerName()); + } entry = new DrainingHashEntry(consumer); drainingHashes.put(stickyHash, entry); // update the consumer specific stats @@ -189,6 +199,11 @@ public synchronized void addEntry(Consumer consumer, int stickyHash) { "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer + "."); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} incrementing {} consumer id:{} name:{}", dispatcherName, stickyHash, + entry.refCount + 1, consumer.consumerId(), consumer.consumerName()); + } } entry.incrementRefCount(); } @@ -233,6 +248,10 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole + "."); } if (entry.decrementRefCount()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, + consumer.consumerId(), consumer.consumerName()); + } DrainingHashEntry removed = drainingHashes.remove(stickyHash); // update the consumer specific stats consumerDrainingHashesStatsMap.compute(new ConsumerIdentityWrapper(consumer), @@ -243,6 +262,9 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole } return consumerDrainingHashesStats; }); + if (log.isDebugEnabled()) { + log.debug("consumerDrainingHashesStatsMap size: {}", consumerDrainingHashesStatsMap.size()); + } if (!closing && removed.isBlocking()) { if (batchLevel > 0) { unblockedWhileBatching = true; @@ -250,6 +272,11 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole unblockingHandler.stickyKeyHashUnblocked(stickyHash); } } + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, stickyHash, + entry.refCount, consumer.consumerId(), consumer.consumerName()); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 4e61a2c0ed38e..1a3e2f706cba8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -351,8 +351,8 @@ private boolean handleAddingPendingAck(Consumer consumer, long ledgerId, long en return false; } if (log.isDebugEnabled()) { - log.debug("[{}] Adding {}:{} to pending acks for consumer {} with sticky key hash {}", - getName(), ledgerId, entryId, consumer, stickyKeyHash); + log.debug("[{}] Adding {}:{} to pending acks for consumer id:{} name:{} with sticky key hash {}", + getName(), ledgerId, entryId, consumer.consumerId(), consumer.consumerName(), stickyKeyHash); } // allow adding the message to pending acks and sending the message to the consumer return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java index 3554f29255227..4812be58cdc78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; +import lombok.extern.slf4j.Slf4j; /** * Reschedules reads so that the possible pending read is cancelled if it's waiting for more entries. @@ -30,6 +31,7 @@ * that should be handled. This will also batch multiple calls together to reduce the number of * operations. */ +@Slf4j class RescheduleReadHandler { private static final int UNSET = -1; private static final int NO_PENDING_READ = 0; @@ -70,15 +72,27 @@ public void rescheduleRead() { // are entries in the replay queue. if (maxReadOpCount != NO_PENDING_READ && readOpCounterSupplier.getAsLong() == maxReadOpCount && hasEntriesInReplayQueue.getAsBoolean()) { + if (log.isDebugEnabled()) { + log.debug("Cancelling pending read request because it's waiting for more entries"); + } cancelPendingRead.run(); } // Re-schedule read immediately, or join the next scheduled read + if (log.isDebugEnabled()) { + log.debug("Triggering read"); + } rescheduleReadImmediately.run(); }; long rescheduleDelay = readIntervalMsSupplier.getAsLong(); if (rescheduleDelay > 0) { + if (log.isDebugEnabled()) { + log.debug("Scheduling after {} ms", rescheduleDelay); + } executor.schedule(runnable, rescheduleDelay, TimeUnit.MILLISECONDS); } else { + if (log.isDebugEnabled()) { + log.debug("Running immediately"); + } runnable.run(); } } else { diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml index 09a89702ee2ac..a0732096f2845 100644 --- a/pulsar-broker/src/test/resources/log4j2.xml +++ b/pulsar-broker/src/test/resources/log4j2.xml @@ -36,5 +36,16 @@ + From 5f5db683a48a69420bee7f08ffdcf53828d74e63 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 03:40:53 +0300 Subject: [PATCH 08/15] Fix bug in stats --- .../apache/pulsar/broker/service/DrainingHashesTracker.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 3f8afeb82c657..8f9c0dd060f3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -338,6 +338,10 @@ public synchronized void clear() { * @param consumerStats the consumer stats to update the values to */ public void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { + consumerStats.drainingHashesCount = 0; + consumerStats.drainingHashesClearedTotal = 0; + consumerStats.drainingHashesUnackedMessages = 0; + consumerStats.drainingHashes = Collections.emptyList(); ConsumerDrainingHashesStats consumerDrainingHashesStats = consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); if (consumerDrainingHashesStats != null) { From 9d00334cb393a16741f1693143535b7464ba393a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 04:04:10 +0300 Subject: [PATCH 09/15] Improve test --- .../broker/stats/ConsumerStatsTest.java | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 5683ebda79cf2..9df0ccc741f01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.INTEGER; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertNotEquals; import static org.testng.AssertJUnit.assertEquals; @@ -496,18 +497,18 @@ public void testNonPersistentTopicSharedSubscriptionUnackedMessages() throws Exc @Test public void testKeySharedDrainingHashesConsumerStats() throws Exception { - conf.setSubscriptionKeySharedUseConsistentHashing(true); - conf.setSubscriptionKeySharedConsistentHashingReplicaPoints(100); String topic = newUniqueName("testKeySharedDrainingHashesConsumerStats"); String subscriptionName = "sub"; int numberOfKeys = 10; + // Create a producer for the topic @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) .topic(topic) .enableBatching(false) .create(); + // Create the first consumer (c1) for the topic @Cleanup Consumer c1 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) @@ -517,9 +518,11 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { .subscriptionType(SubscriptionType.Key_Shared) .subscribe(); + // Get the dispatcher and selector for the topic StickyKeyDispatcher dispatcher = getDispatcher(topic, subscriptionName); StickyKeyConsumerSelector selector = dispatcher.getSelector(); + // Send 20 messages with keys cycling from 0 to numberOfKeys-1 for (int i = 0; i < 20; i++) { String key = String.valueOf(i % numberOfKeys); int stickyKeyHash = selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8)); @@ -530,11 +533,11 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { .send(); } + // Wait until all the already published messages have been pre-fetched by c1 PendingAcksMap c1PendingAcks = dispatcher.getConsumers().get(0).getPendingAcks(); - // Wait until all the already published messages have been pre-fetched by C1. Awaitility.await().ignoreExceptions().until(() -> c1PendingAcks.size() == 20); - // Adding a new consumer. + // Add a new consumer (c2) for the topic @Cleanup Consumer c2 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) @@ -543,39 +546,46 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { .subscriptionType(SubscriptionType.Key_Shared) .subscribe(); - //BrokerTestUtil.logTopicStats(log, pulsar.getWebServiceAddress(), topic); - - SubscriptionStats subscriptionStats = - admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + // Get the subscription stats and consumer stats + SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); ConsumerStats c1Stats = subscriptionStats.getConsumers().get(0); ConsumerStats c2Stats = subscriptionStats.getConsumers().get(1); Set c2HashesByStats = new HashSet<>(); Set c2HashesByDispatcher = new HashSet<>(); - Map c1DrainingHashesExpected = new HashMap<>(); + + // Determine which hashes are assigned to c2 and which are draining from c1 for (int i = 0; i < 20; i++) { + // use the same key as in the sent messages String key = String.valueOf(i % numberOfKeys); int hash = selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8)); + // Validate that the hash is assigned to c2 in stats if ("c2".equals(findConsumerNameForHash(subscriptionStats, hash))) { c2HashesByStats.add(hash); } + // use the selector to determine the expected draining hashes for c1 org.apache.pulsar.broker.service.Consumer selected = selector.select(hash); if ("c2".equals(selected.consumerName())) { c2HashesByDispatcher.add(hash); c1DrainingHashesExpected.compute(hash, (k, v) -> v == null ? 1 : v + 1); } } + + // Validate that the hashes assigned to c2 match between stats and dispatcher assertThat(c2HashesByStats).containsExactlyInAnyOrderElementsOf(c2HashesByDispatcher); + // Validate the draining hashes for c1 assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash) .containsExactlyInAnyOrderElementsOf(c2HashesByStats); assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash, DrainingHash::getUnackMsgs) .containsExactlyInAnyOrderElementsOf(c1DrainingHashesExpected.entrySet().stream() .map(e -> Tuple.tuple(e.getKey(), e.getValue())).toList()); - assertThat(c2Stats.getDrainingHashes()).isNullOrEmpty(); + // Validate that c2 has no draining hashes + assertThat(c2Stats.getDrainingHashes()).isEmpty(); + // Send another 20 messages for (int i = 0; i < 20; i++) { producer.newMessage() .key(String.valueOf(i % numberOfKeys)) @@ -583,24 +593,24 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { .send(); } - // validate blocked attempts + // Validate blocked attempts for c1 Awaitility.await().ignoreExceptions().untilAsserted(() -> { - SubscriptionStats stats = - admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + SubscriptionStats stats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); assertThat(stats.getConsumers().get(0).getDrainingHashes()).isNotEmpty().allSatisfy(dh -> { - assertThat(dh).extracting(DrainingHash::getBlockedAttempts).matches(attempts -> attempts > 0); + assertThat(dh).extracting(DrainingHash::getBlockedAttempts) + .asInstanceOf(INTEGER) + .isGreaterThan(0); }); }); - // acknowledging messages that were sent before c2 joined should clear all draining hashes - - // ack 19 messages + // Acknowledge messages that were sent before c2 joined, to clear all draining hashes for (int i = 0; i < 20; i++) { Message message = c1.receive(1, TimeUnit.SECONDS); log.info("Acking message with value {} key {}", message.getValue(), message.getKey()); c1.acknowledge(message); + if (i == 18) { - // now there should be one draining hash left + // Validate that there is one draining hash left Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)) .untilAsserted(() -> { SubscriptionStats stats = @@ -613,8 +623,9 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { }); }); } + if (i == 19) { - // now there should be no draining hashes left + // Validate that there are no draining hashes left Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)) .untilAsserted(() -> { SubscriptionStats stats = @@ -626,7 +637,6 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { }); } } - } private String findConsumerNameForHash(SubscriptionStats subscriptionStats, int hash) { From 55db9c21007c2f2e59233a5df4e2b82c6c03d596 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 04:16:59 +0300 Subject: [PATCH 10/15] Validate counters --- .../broker/stats/ConsumerStatsTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 9df0ccc741f01..3481b09cf66e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -555,7 +555,9 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { Set c2HashesByDispatcher = new HashSet<>(); Map c1DrainingHashesExpected = new HashMap<>(); + int expectedDrainingHashesUnackMessages = 0; // Determine which hashes are assigned to c2 and which are draining from c1 + // run for the same keys as the sent messages for (int i = 0; i < 20; i++) { // use the same key as in the sent messages String key = String.valueOf(i % numberOfKeys); @@ -569,6 +571,7 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { if ("c2".equals(selected.consumerName())) { c2HashesByDispatcher.add(hash); c1DrainingHashesExpected.compute(hash, (k, v) -> v == null ? 1 : v + 1); + expectedDrainingHashesUnackMessages++; } } @@ -585,6 +588,14 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { // Validate that c2 has no draining hashes assertThat(c2Stats.getDrainingHashes()).isEmpty(); + // Validate counters + assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(c2HashesByStats.size()); + assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(0); + assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(expectedDrainingHashesUnackMessages); + assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0); + // Send another 20 messages for (int i = 0; i < 20; i++) { producer.newMessage() @@ -637,6 +648,20 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception { }); } } + + // Get the subscription stats and consumer stats + subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + c1Stats = subscriptionStats.getConsumers().get(0); + c2Stats = subscriptionStats.getConsumers().get(1); + + // Validate counters + assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(0); + assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(c2HashesByStats.size()); + assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0); + } private String findConsumerNameForHash(SubscriptionStats subscriptionStats, int hash) { From 38c1b79fa6d8de554f2188d1d55c3cf1efd2fbe5 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 04:22:12 +0300 Subject: [PATCH 11/15] Fix bug in counter, don't remove stats for consumer --- .../broker/service/DrainingHashesTracker.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 8f9c0dd060f3a..46762c844db6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -254,16 +254,10 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole } DrainingHashEntry removed = drainingHashes.remove(stickyHash); // update the consumer specific stats - consumerDrainingHashesStatsMap.compute(new ConsumerIdentityWrapper(consumer), - (key, consumerDrainingHashesStats) -> { - if (consumerDrainingHashesStats != null && consumerDrainingHashesStats.clearHash(stickyHash)) { - // remove the consumer specific stats if all hashes are cleared - return null; - } - return consumerDrainingHashesStats; - }); - if (log.isDebugEnabled()) { - log.debug("consumerDrainingHashesStatsMap size: {}", consumerDrainingHashesStatsMap.size()); + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); } if (!closing && removed.isBlocking()) { if (batchLevel > 0) { From 7d5db1eaaec8e19023894ad7cea9a358be35446a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 05:16:51 +0300 Subject: [PATCH 12/15] Fix test for checking allowed fields --- .../broker/stats/ConsumerStatsTest.java | 73 ++++++++++++++----- 1 file changed, 56 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 3481b09cf66e0..30110199196bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -39,7 +39,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -53,6 +52,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.PrometheusMetricsTestUtil; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.PendingAcksMap; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; @@ -88,6 +88,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -232,9 +233,21 @@ public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception { Assert.assertEquals(updatedStats.getBytesOutCounter(), 1280); } - @Test - public void testConsumerStatsOutput() throws Exception { - Set allowedFields = Sets.newHashSet( + @DataProvider(name = "classicAndSubscriptionType") + public Object[][] classicAndSubscriptionType() { + return new Object[][]{ + {false, SubscriptionType.Shared}, + {true, SubscriptionType.Key_Shared}, + {false, SubscriptionType.Key_Shared} + }; + } + + @Test(dataProvider = "classicAndSubscriptionType") + public void testConsumerStatsOutput(boolean classicDispatchers, SubscriptionType subscriptionType) + throws Exception { + conf.setSubscriptionSharedUseClassicPersistentImplementation(classicDispatchers); + conf.setSubscriptionKeySharedUseClassicPersistentImplementation(classicDispatchers); + Set expectedFields = Sets.newHashSet( "msgRateOut", "msgThroughputOut", "bytesOutCounter", @@ -247,21 +260,56 @@ public void testConsumerStatsOutput() throws Exception { "unackedMessages", "avgMessagesPerEntry", "blockedConsumerOnUnackedMsgs", - "readPositionWhenJoining", "lastAckedTime", "lastAckedTimestamp", "lastConsumedTime", "lastConsumedTimestamp", "lastConsumedFlowTimestamp", - "keyHashRanges", "metadata", "address", "connectedSince", - "clientVersion"); + "clientVersion", + "drainingHashesCount", + "drainingHashesClearedTotal", + "drainingHashesUnackedMessages" + ); + if (subscriptionType == SubscriptionType.Key_Shared) { + if (classicDispatchers) { + expectedFields.addAll(List.of( + "readPositionWhenJoining", + "keyHashRanges" + )); + } else { + expectedFields.addAll(List.of( + "drainingHashes", + "keyHashRangeArrays" + )); + } + } + final String topicName = newUniqueName("persistent://my-property/my-ns/testConsumerStatsOutput"); + final String subName = "my-subscription"; - final String topicName = "persistent://prop/use/ns-abc/testConsumerStatsOutput"; + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(subscriptionType) + .subscriptionName(subName) + .subscribe(); + + String topicStatsUri = + String.format("%s/admin/v2/%s/stats", pulsar.getWebServiceAddress(), topicName.replace("://", "/")); + String topicStatsJson = BrokerTestUtil.getJsonResourceAsString(topicStatsUri); + ObjectMapper mapper = ObjectMapperFactory.create(); + JsonNode node = mapper.readTree(topicStatsJson).get("subscriptions").get(subName).get("consumers").get(0); + assertThat(node.fieldNames()).toIterable().containsExactlyInAnyOrderElementsOf(expectedFields); + } + + @Test + public void testLastConsumerFlowTimestamp() throws PulsarClientException, PulsarAdminException { + final String topicName = newUniqueName("persistent://my-property/my-ns/testLastConsumerFlowTimestamp"); final String subName = "my-subscription"; + @Cleanup Consumer consumer = pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Shared) @@ -269,18 +317,9 @@ public void testConsumerStatsOutput() throws Exception { .subscribe(); TopicStats stats = admin.topics().getStats(topicName); - ObjectMapper mapper = ObjectMapperFactory.create(); ConsumerStats consumerStats = stats.getSubscriptions() .get(subName).getConsumers().get(0); Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0); - JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(consumerStats)); - Iterator itr = node.fieldNames(); - while (itr.hasNext()) { - String field = itr.next(); - Assert.assertTrue(allowedFields.contains(field), field + " should not be exposed"); - } - - consumer.close(); } From 84c32c201f4c0ebde20524121b2933a3d92e56ed Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 06:58:20 +0300 Subject: [PATCH 13/15] Fix test --- .../stats/AuthenticatedConsumerStatsTest.java | 57 ++++--------------- 1 file changed, 10 insertions(+), 47 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java index e8cadb72e1e04..20c1c5498ce6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java @@ -18,11 +18,19 @@ */ package org.apache.pulsar.broker.stats; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.time.Duration; +import java.util.Base64; +import java.util.Date; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -37,18 +45,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.security.KeyPair; -import java.security.KeyPairGenerator; -import java.security.NoSuchAlgorithmException; -import java.security.PrivateKey; -import java.time.Duration; -import java.util.Base64; -import java.util.Date; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Properties; -import java.util.Set; - public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{ private final String ADMIN_TOKEN; private final String TOKEN_PUBLIC_KEY; @@ -115,32 +111,6 @@ protected void setup() throws Exception { @Test public void testConsumerStatsOutput() throws Exception { - Set allowedFields = Sets.newHashSet( - "msgRateOut", - "msgThroughputOut", - "bytesOutCounter", - "msgOutCounter", - "messageAckRate", - "msgRateRedeliver", - "chunkedMessageRate", - "consumerName", - "availablePermits", - "unackedMessages", - "avgMessagesPerEntry", - "blockedConsumerOnUnackedMsgs", - "readPositionWhenJoining", - "lastAckedTime", - "lastAckedTimestamp", - "lastConsumedTime", - "lastConsumedTimestamp", - "lastConsumedFlowTimestamp", - "keyHashRanges", - "metadata", - "address", - "connectedSince", - "clientVersion", - "appId"); - final String topicName = "persistent://public/default/testConsumerStatsOutput"; final String subName = "my-subscription"; @@ -154,13 +124,6 @@ public void testConsumerStatsOutput() throws Exception { ObjectMapper mapper = ObjectMapperFactory.create(); ConsumerStats consumerStats = stats.getSubscriptions() .get(subName).getConsumers().get(0); - Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0); - JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(consumerStats)); - Iterator itr = node.fieldNames(); - while (itr.hasNext()) { - String field = itr.next(); - Assert.assertTrue(allowedFields.contains(field), field + " should not be exposed"); - } // assert that role is exposed Assert.assertEquals(consumerStats.getAppId(), "admin"); consumer.close(); From 73c97c82ceeeb08308992baee57e59a07291e1b7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 07:41:02 +0300 Subject: [PATCH 14/15] Fix test --- .../org/apache/pulsar/broker/stats/ConsumerStatsTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 30110199196bd..59a911500e5d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -86,6 +86,7 @@ import org.assertj.core.groups.Tuple; import org.awaitility.Awaitility; import org.testng.Assert; +import org.testng.SkipException; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -245,6 +246,9 @@ public Object[][] classicAndSubscriptionType() { @Test(dataProvider = "classicAndSubscriptionType") public void testConsumerStatsOutput(boolean classicDispatchers, SubscriptionType subscriptionType) throws Exception { + if (this instanceof AuthenticatedConsumerStatsTest) { + throw new SkipException("Skip test for AuthenticatedConsumerStatsTest"); + } conf.setSubscriptionSharedUseClassicPersistentImplementation(classicDispatchers); conf.setSubscriptionKeySharedUseClassicPersistentImplementation(classicDispatchers); Set expectedFields = Sets.newHashSet( From 87af5f40baf37644dfd73916d191557bfbd4e4ce Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 08:04:04 +0300 Subject: [PATCH 15/15] Use pinned trivy version --- .github/workflows/pulsar-ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 47a39bef9c908..bf44c51b6ad02 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -894,7 +894,7 @@ jobs: - name: Run Trivy container scan id: trivy_scan - uses: aquasecurity/trivy-action@master + uses: aquasecurity/trivy-action@0.26.0 if: ${{ github.repository == 'apache/pulsar' && github.event_name != 'pull_request' }} continue-on-error: true with: