From 4f3cc6c5d277b334b3a6868f9fc641648cd952a3 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 1 May 2024 11:17:19 -0700 Subject: [PATCH] [feat][broker] PIP-264: Add topic messaging metrics (#22467) --- .../mledger/ManagedLedgerMXBean.java | 10 + .../mledger/impl/ManagedLedgerMBeanImpl.java | 10 + .../mledger/impl/ManagedLedgerMBeanTest.java | 8 + .../apache/pulsar/broker/PulsarService.java | 8 + .../pulsar/broker/service/AbstractTopic.java | 8 + .../broker/stats/OpenTelemetryTopicStats.java | 490 ++++++++++++++++++ .../broker/stats/prometheus/TopicStats.java | 17 + .../pulsar/compaction/CompactionRecord.java | 12 + .../broker/admin/AdminApiOffloadTest.java | 31 +- .../auth/MockedPulsarServiceBaseTest.java | 10 + .../service/BacklogQuotaManagerTest.java | 59 ++- .../service/BrokerServiceThrottlingTest.java | 16 +- .../persistent/DelayedDeliveryTest.java | 21 + .../stats/BrokerOpenTelemetryTestUtil.java | 92 ++++ .../stats/OpenTelemetryTopicStatsTest.java | 145 ++++++ .../broker/testcontext/PulsarTestContext.java | 10 +- .../broker/transaction/TransactionTest.java | 34 +- .../transaction/TransactionTestBase.java | 1 + .../client/api/BrokerServiceLookupTest.java | 3 + .../pulsar/compaction/CompactorTest.java | 45 +- .../OpenTelemetryAttributes.java | 40 ++ 21 files changed, 1039 insertions(+), 31 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java index cb6d3700afe3a..44345c430b7fb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java @@ -85,6 +85,11 @@ public interface ManagedLedgerMXBean { */ long getAddEntrySucceed(); + /** + * @return the total number of addEntry requests that succeeded + */ + long getAddEntrySucceedTotal(); + /** * @return the number of addEntry requests that failed */ @@ -100,6 +105,11 @@ public interface ManagedLedgerMXBean { */ long getReadEntriesSucceeded(); + /** + * @return the total number of readEntries requests that succeeded + */ + long getReadEntriesSucceededTotal(); + /** * @return the number of readEntries requests that failed */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 3935828ff3d80..5e5161a29ca79 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -230,6 +230,11 @@ public long getAddEntrySucceed() { return addEntryOps.getCount(); } + @Override + public long getAddEntrySucceedTotal() { + return addEntryOps.getTotalCount(); + } + @Override public long getAddEntryErrors() { return addEntryOpsFailed.getCount(); @@ -240,6 +245,11 @@ public long getReadEntriesSucceeded() { return readEntriesOps.getCount(); } + @Override + public long getReadEntriesSucceededTotal() { + return readEntriesOps.getTotalCount(); + } + @Override public long getReadEntriesErrors() { return readEntriesOpsFailed.getCount(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java index 2505db6ec55d7..5f6bd0b7ae64d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java @@ -77,10 +77,12 @@ public void simple() throws Exception { assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 0.0); assertEquals(mbean.getAddEntryMessagesRate(), 0.0); assertEquals(mbean.getAddEntrySucceed(), 0); + assertEquals(mbean.getAddEntrySucceedTotal(), 0); assertEquals(mbean.getAddEntryErrors(), 0); assertEquals(mbean.getReadEntriesBytesRate(), 0.0); assertEquals(mbean.getReadEntriesRate(), 0.0); assertEquals(mbean.getReadEntriesSucceeded(), 0); + assertEquals(mbean.getReadEntriesSucceededTotal(), 0); assertEquals(mbean.getReadEntriesErrors(), 0); assertEquals(mbean.getMarkDeleteRate(), 0.0); @@ -105,10 +107,12 @@ public void simple() throws Exception { assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 1600.0); assertEquals(mbean.getAddEntryMessagesRate(), 2.0); assertEquals(mbean.getAddEntrySucceed(), 2); + assertEquals(mbean.getAddEntrySucceedTotal(), 2); assertEquals(mbean.getAddEntryErrors(), 0); assertEquals(mbean.getReadEntriesBytesRate(), 0.0); assertEquals(mbean.getReadEntriesRate(), 0.0); assertEquals(mbean.getReadEntriesSucceeded(), 0); + assertEquals(mbean.getReadEntriesSucceededTotal(), 0); assertEquals(mbean.getReadEntriesErrors(), 0); assertTrue(mbean.getMarkDeleteRate() > 0.0); @@ -134,10 +138,14 @@ public void simple() throws Exception { assertEquals(mbean.getReadEntriesBytesRate(), 600.0); assertEquals(mbean.getReadEntriesRate(), 1.0); assertEquals(mbean.getReadEntriesSucceeded(), 1); + assertEquals(mbean.getReadEntriesSucceededTotal(), 1); assertEquals(mbean.getReadEntriesErrors(), 0); assertEquals(mbean.getNumberOfMessagesInBacklog(), 1); assertEquals(mbean.getMarkDeleteRate(), 0.0); + assertEquals(mbean.getAddEntrySucceed(), 0); + assertEquals(mbean.getAddEntrySucceedTotal(), 2); + factory.shutdown(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 96f3653ea9966..8c910fb91e109 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -109,6 +109,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; import org.apache.pulsar.broker.stats.MetricsGenerator; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; @@ -252,6 +253,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private MetricsGenerator metricsGenerator; private final PulsarBrokerOpenTelemetry openTelemetry; + private OpenTelemetryTopicStats openTelemetryTopicStats; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; @@ -631,6 +633,10 @@ public CompletableFuture closeAsync() { brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); + if (openTelemetryTopicStats != null) { + openTelemetryTopicStats.close(); + } + asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup)); @@ -771,6 +777,8 @@ public void start() throws PulsarServerException { config.getDefaultRetentionTimeInMinutes() * 60)); } + openTelemetryTopicStats = new OpenTelemetryTopicStats(this); + localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 44a4ca42cea46..b6ce43b060c6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -133,6 +133,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener RATE_LIMITED_UPDATER = AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes"); protected volatile long publishRateLimitedTimes = 0L; + private static final AtomicLongFieldUpdater TOTAL_RATE_LIMITED_UPDATER = + AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "totalPublishRateLimitedCounter"); + protected volatile long totalPublishRateLimitedCounter = 0L; private static final AtomicIntegerFieldUpdater USER_CREATED_PRODUCER_COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount"); @@ -897,6 +900,7 @@ public void recordAddLatency(long latency, TimeUnit unit) { @Override public long increasePublishLimitedTimes() { + TOTAL_RATE_LIMITED_UPDATER.incrementAndGet(this); return RATE_LIMITED_UPDATER.incrementAndGet(this); } @@ -1185,6 +1189,10 @@ public long getBytesOutCounter() { + sumSubscriptions(AbstractSubscription::getBytesOutCounter); } + public long getTotalPublishRateLimitCounter() { + return TOTAL_RATE_LIMITED_UPDATER.get(this); + } + private long sumSubscriptions(ToLongFunction toCounter) { return getSubscriptions().values().stream() .map(AbstractSubscription.class::cast) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java new file mode 100644 index 0000000000000..1f0735c0ec1f7 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.AbstractTopic; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.stats.MetricsUtil; +import org.apache.pulsar.compaction.CompactedTopicContext; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; + +public class OpenTelemetryTopicStats implements AutoCloseable { + + // Replaces pulsar_subscriptions_count + public static final String SUBSCRIPTION_COUNTER = "pulsar.broker.topic.subscription.count"; + private final ObservableLongMeasurement subscriptionCounter; + + // Replaces pulsar_producers_count + public static final String PRODUCER_COUNTER = "pulsar.broker.topic.producer.count"; + private final ObservableLongMeasurement producerCounter; + + // Replaces pulsar_consumers_count + public static final String CONSUMER_COUNTER = "pulsar.broker.topic.consumer.count"; + private final ObservableLongMeasurement consumerCounter; + + // Replaces ['pulsar_rate_in', 'pulsar_in_messages_total'] + public static final String MESSAGE_IN_COUNTER = "pulsar.broker.topic.message.incoming.count"; + private final ObservableLongMeasurement messageInCounter; + + // Replaces ['pulsar_rate_out', 'pulsar_out_messages_total'] + public static final String MESSAGE_OUT_COUNTER = "pulsar.broker.topic.message.outgoing.count"; + private final ObservableLongMeasurement messageOutCounter; + + // Replaces ['pulsar_throughput_in', 'pulsar_in_bytes_total'] + public static final String BYTES_IN_COUNTER = "pulsar.broker.topic.message.incoming.size"; + private final ObservableLongMeasurement bytesInCounter; + + // Replaces ['pulsar_throughput_out', 'pulsar_out_bytes_total'] + public static final String BYTES_OUT_COUNTER = "pulsar.broker.topic.message.outgoing.size"; + private final ObservableLongMeasurement bytesOutCounter; + + // Replaces pulsar_publish_rate_limit_times + public static final String PUBLISH_RATE_LIMIT_HIT_COUNTER = "pulsar.broker.topic.publish.rate.limit.count"; + private final ObservableLongMeasurement publishRateLimitHitCounter; + + // Omitted: pulsar_consumer_msg_ack_rate + + // Replaces pulsar_storage_size + public static final String STORAGE_COUNTER = "pulsar.broker.topic.storage.size"; + private final ObservableLongMeasurement storageCounter; + + // Replaces pulsar_storage_logical_size + public static final String STORAGE_LOGICAL_COUNTER = "pulsar.broker.topic.storage.logical.size"; + private final ObservableLongMeasurement storageLogicalCounter; + + // Replaces pulsar_storage_backlog_size + public static final String STORAGE_BACKLOG_COUNTER = "pulsar.broker.topic.storage.backlog.size"; + private final ObservableLongMeasurement storageBacklogCounter; + + // Replaces pulsar_storage_offloaded_size + public static final String STORAGE_OFFLOADED_COUNTER = "pulsar.broker.topic.storage.offloaded.size"; + private final ObservableLongMeasurement storageOffloadedCounter; + + // Replaces pulsar_storage_backlog_quota_limit + public static final String BACKLOG_QUOTA_LIMIT_SIZE = "pulsar.broker.topic.storage.backlog.quota.limit.size"; + private final ObservableLongMeasurement backlogQuotaLimitSize; + + // Replaces pulsar_storage_backlog_quota_limit_time + public static final String BACKLOG_QUOTA_LIMIT_TIME = "pulsar.broker.topic.storage.backlog.quota.limit.time"; + private final ObservableLongMeasurement backlogQuotaLimitTime; + + // Replaces pulsar_storage_backlog_quota_exceeded_evictions_total + public static final String BACKLOG_EVICTION_COUNTER = "pulsar.broker.topic.storage.backlog.quota.eviction.count"; + private final ObservableLongMeasurement backlogEvictionCounter; + + // Replaces pulsar_storage_backlog_age_seconds + public static final String BACKLOG_QUOTA_AGE = "pulsar.broker.topic.storage.backlog.age"; + private final ObservableLongMeasurement backlogQuotaAge; + + // Replaces pulsar_storage_write_rate + public static final String STORAGE_OUT_COUNTER = "pulsar.broker.topic.storage.entry.outgoing.count"; + private final ObservableLongMeasurement storageOutCounter; + + // Replaces pulsar_storage_read_rate + public static final String STORAGE_IN_COUNTER = "pulsar.broker.topic.storage.entry.incoming.count"; + private final ObservableLongMeasurement storageInCounter; + + // Omitted: pulsar_storage_write_latency_le_* + + // Omitted: pulsar_entry_size_le_* + + // Replaces pulsar_compaction_removed_event_count + public static final String COMPACTION_REMOVED_COUNTER = "pulsar.broker.topic.compaction.removed.message.count"; + private final ObservableLongMeasurement compactionRemovedCounter; + + // Replaces ['pulsar_compaction_succeed_count', 'pulsar_compaction_failed_count'] + public static final String COMPACTION_OPERATION_COUNTER = "pulsar.broker.topic.compaction.operation.count"; + private final ObservableLongMeasurement compactionOperationCounter; + + // Replaces pulsar_compaction_duration_time_in_mills + public static final String COMPACTION_DURATION_SECONDS = "pulsar.broker.topic.compaction.duration"; + private final ObservableDoubleMeasurement compactionDurationSeconds; + + // Replaces pulsar_compaction_read_throughput + public static final String COMPACTION_BYTES_IN_COUNTER = "pulsar.broker.topic.compaction.incoming.size"; + private final ObservableLongMeasurement compactionBytesInCounter; + + // Replaces pulsar_compaction_write_throughput + public static final String COMPACTION_BYTES_OUT_COUNTER = "pulsar.broker.topic.compaction.outgoing.size"; + private final ObservableLongMeasurement compactionBytesOutCounter; + + // Omitted: pulsar_compaction_latency_le_* + + // Replaces pulsar_compaction_compacted_entries_count + public static final String COMPACTION_ENTRIES_COUNTER = "pulsar.broker.topic.compaction.compacted.entry.count"; + private final ObservableLongMeasurement compactionEntriesCounter; + + // Replaces pulsar_compaction_compacted_entries_size + public static final String COMPACTION_BYTES_COUNTER = "pulsar.broker.topic.compaction.compacted.entry.size"; + private final ObservableLongMeasurement compactionBytesCounter; + + // Replaces ['pulsar_txn_tb_active_total', 'pulsar_txn_tb_aborted_total', 'pulsar_txn_tb_committed_total'] + public static final String TRANSACTION_COUNTER = "pulsar.broker.topic.transaction.count"; + private final ObservableLongMeasurement transactionCounter; + + // Replaces pulsar_subscription_delayed + public static final String DELAYED_SUBSCRIPTION_COUNTER = "pulsar.broker.topic.subscription.delayed.entry.count"; + private final ObservableLongMeasurement delayedSubscriptionCounter; + + // Omitted: pulsar_delayed_message_index_size_bytes + + // Omitted: pulsar_delayed_message_index_bucket_total + + // Omitted: pulsar_delayed_message_index_loaded + + // Omitted: pulsar_delayed_message_index_bucket_snapshot_size_bytes + + // Omitted: pulsar_delayed_message_index_bucket_op_count + + // Omitted: pulsar_delayed_message_index_bucket_op_latency_ms + + + private final BatchCallback batchCallback; + private final PulsarService pulsar; + + public OpenTelemetryTopicStats(PulsarService pulsar) { + this.pulsar = pulsar; + var meter = pulsar.getOpenTelemetry().getMeter(); + + subscriptionCounter = meter + .upDownCounterBuilder(SUBSCRIPTION_COUNTER) + .setUnit("{subscription}") + .setDescription("The number of Pulsar subscriptions of the topic served by this broker.") + .buildObserver(); + + producerCounter = meter + .upDownCounterBuilder(PRODUCER_COUNTER) + .setUnit("{producer}") + .setDescription("The number of active producers of the topic connected to this broker.") + .buildObserver(); + + consumerCounter = meter + .upDownCounterBuilder(CONSUMER_COUNTER) + .setUnit("{consumer}") + .setDescription("The number of active consumers of the topic connected to this broker.") + .buildObserver(); + + messageInCounter = meter + .counterBuilder(MESSAGE_IN_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages received for this topic.") + .buildObserver(); + + messageOutCounter = meter + .counterBuilder(MESSAGE_OUT_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages read from this topic.") + .buildObserver(); + + bytesInCounter = meter + .counterBuilder(BYTES_IN_COUNTER) + .setUnit("By") + .setDescription("The total number of messages bytes received for this topic.") + .buildObserver(); + + bytesOutCounter = meter + .counterBuilder(BYTES_OUT_COUNTER) + .setUnit("By") + .setDescription("The total number of messages bytes read from this topic.") + .buildObserver(); + + publishRateLimitHitCounter = meter + .counterBuilder(PUBLISH_RATE_LIMIT_HIT_COUNTER) + .setUnit("{event}") + .setDescription("The number of times the publish rate limit is triggered.") + .buildObserver(); + + storageCounter = meter + .upDownCounterBuilder(STORAGE_COUNTER) + .setUnit("By") + .setDescription( + "The total storage size of the messages in this topic, including storage used by replicas.") + .buildObserver(); + + storageLogicalCounter = meter + .upDownCounterBuilder(STORAGE_LOGICAL_COUNTER) + .setUnit("By") + .setDescription("The storage size of the messages in this topic, excluding storage used by replicas.") + .buildObserver(); + + storageBacklogCounter = meter + .upDownCounterBuilder(STORAGE_BACKLOG_COUNTER) + .setUnit("By") + .setDescription("The size of the backlog storage for this topic.") + .buildObserver(); + + storageOffloadedCounter = meter + .upDownCounterBuilder(STORAGE_OFFLOADED_COUNTER) + .setUnit("By") + .setDescription("The total amount of the data in this topic offloaded to the tiered storage.") + .buildObserver(); + + backlogQuotaLimitSize = meter + .upDownCounterBuilder(BACKLOG_QUOTA_LIMIT_SIZE) + .setUnit("By") + .setDescription("The size based backlog quota limit for this topic.") + .buildObserver(); + + backlogQuotaLimitTime = meter + .gaugeBuilder(BACKLOG_QUOTA_LIMIT_TIME) + .ofLongs() + .setUnit("s") + .setDescription("The time based backlog quota limit for this topic.") + .buildObserver(); + + backlogEvictionCounter = meter + .counterBuilder(BACKLOG_EVICTION_COUNTER) + .setUnit("{eviction}") + .setDescription("The number of times a backlog was evicted since it has exceeded its quota.") + .buildObserver(); + + backlogQuotaAge = meter + .gaugeBuilder(BACKLOG_QUOTA_AGE) + .ofLongs() + .setUnit("s") + .setDescription("The age of the oldest unacknowledged message (backlog).") + .buildObserver(); + + storageOutCounter = meter + .counterBuilder(STORAGE_OUT_COUNTER) + .setUnit("{entry}") + .setDescription("The total message batches (entries) written to the storage for this topic.") + .buildObserver(); + + storageInCounter = meter + .counterBuilder(STORAGE_IN_COUNTER) + .setUnit("{entry}") + .setDescription("The total message batches (entries) read from the storage for this topic.") + .buildObserver(); + + compactionRemovedCounter = meter + .counterBuilder(COMPACTION_REMOVED_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages removed by compaction.") + .buildObserver(); + + compactionOperationCounter = meter + .counterBuilder(COMPACTION_OPERATION_COUNTER) + .setUnit("{operation}") + .setDescription("The total number of compaction operations.") + .buildObserver(); + + compactionDurationSeconds = meter + .upDownCounterBuilder(COMPACTION_DURATION_SECONDS) + .ofDoubles() + .setUnit("s") + .setDescription("The total time duration of compaction operations on the topic.") + .buildObserver(); + + compactionBytesInCounter = meter + .counterBuilder(COMPACTION_BYTES_IN_COUNTER) + .setUnit("By") + .setDescription("The total count of bytes read by the compaction process for this topic.") + .buildObserver(); + + compactionBytesOutCounter = meter + .counterBuilder(COMPACTION_BYTES_OUT_COUNTER) + .setUnit("By") + .setDescription("The total count of bytes written by the compaction process for this topic.") + .buildObserver(); + + compactionEntriesCounter = meter + .counterBuilder(COMPACTION_ENTRIES_COUNTER) + .setUnit("{entry}") + .setDescription("The total number of compacted entries.") + .buildObserver(); + + compactionBytesCounter = meter + .counterBuilder(COMPACTION_BYTES_COUNTER) + .setUnit("By") + .setDescription("The total size of the compacted entries.") + .buildObserver(); + + transactionCounter = meter + .upDownCounterBuilder(TRANSACTION_COUNTER) + .setUnit("{transaction}") + .setDescription("The number of transactions on this topic.") + .buildObserver(); + + delayedSubscriptionCounter = meter + .upDownCounterBuilder(DELAYED_SUBSCRIPTION_COUNTER) + .setUnit("{entry}") + .setDescription("The total number of message batches (entries) delayed for dispatching.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> pulsar.getBrokerService() + .getTopics() + .values() + .stream() + .map(topicFuture -> topicFuture.getNow(Optional.empty())) + .forEach(topic -> topic.ifPresent(this::recordMetricsForTopic)), + subscriptionCounter, + producerCounter, + consumerCounter, + messageInCounter, + messageOutCounter, + bytesInCounter, + bytesOutCounter, + publishRateLimitHitCounter, + storageCounter, + storageLogicalCounter, + storageBacklogCounter, + storageOffloadedCounter, + backlogQuotaLimitSize, + backlogQuotaLimitTime, + backlogEvictionCounter, + backlogQuotaAge, + storageOutCounter, + storageInCounter, + compactionRemovedCounter, + compactionOperationCounter, + compactionDurationSeconds, + compactionBytesInCounter, + compactionBytesOutCounter, + compactionEntriesCounter, + compactionBytesCounter, + transactionCounter, + delayedSubscriptionCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetricsForTopic(Topic topic) { + var topicName = TopicName.get(topic.getName()); + var builder = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString()) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName()); + if (topicName.isPartitioned()) { + builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex()); + } + var attributes = builder.build(); + + if (topic instanceof AbstractTopic abstractTopic) { + subscriptionCounter.record(abstractTopic.getSubscriptions().size(), attributes); + producerCounter.record(abstractTopic.getProducers().size(), attributes); + consumerCounter.record(abstractTopic.getNumberOfConsumers(), attributes); + + messageInCounter.record(abstractTopic.getMsgInCounter(), attributes); + messageOutCounter.record(abstractTopic.getMsgOutCounter(), attributes); + bytesInCounter.record(abstractTopic.getBytesInCounter(), attributes); + bytesOutCounter.record(abstractTopic.getBytesOutCounter(), attributes); + + publishRateLimitHitCounter.record(abstractTopic.getTotalPublishRateLimitCounter(), attributes); + + // Omitted: consumerMsgAckCounter + } + + if (topic instanceof PersistentTopic persistentTopic) { + var managedLedger = persistentTopic.getManagedLedger(); + var managedLedgerStats = persistentTopic.getManagedLedger().getStats(); + storageCounter.record(managedLedgerStats.getStoredMessagesSize(), attributes); + storageLogicalCounter.record(managedLedgerStats.getStoredMessagesLogicalSize(), attributes); + storageBacklogCounter.record(managedLedger.getEstimatedBacklogSize(), attributes); + storageOffloadedCounter.record(managedLedger.getOffloadedSize(), attributes); + storageInCounter.record(managedLedgerStats.getReadEntriesSucceededTotal(), attributes); + storageOutCounter.record(managedLedgerStats.getAddEntrySucceedTotal(), attributes); + + backlogQuotaLimitSize.record( + topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(), + attributes); + backlogQuotaLimitTime.record( + topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(), + attributes); + backlogQuotaAge.record(topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds(), attributes); + var backlogQuotaMetrics = persistentTopic.getPersistentTopicMetrics().getBacklogQuotaMetrics(); + backlogEvictionCounter.record(backlogQuotaMetrics.getSizeBasedBacklogQuotaExceededEvictionCount(), + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "size") + .build()); + backlogEvictionCounter.record(backlogQuotaMetrics.getTimeBasedBacklogQuotaExceededEvictionCount(), + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "time") + .build()); + + var txnBuffer = persistentTopic.getTransactionBuffer(); + transactionCounter.record(txnBuffer.getOngoingTxnCount(), Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "active") + .build()); + transactionCounter.record(txnBuffer.getCommittedTxnCount(), Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed") + .build()); + transactionCounter.record(txnBuffer.getAbortedTxnCount(), Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted") + .build()); + + Optional.ofNullable(pulsar.getNullableCompactor()) + .map(Compactor::getStats) + .flatMap(compactorMXBean -> compactorMXBean.getCompactionRecordForTopic(topic.getName())) + .ifPresent(compactionRecord -> { + compactionRemovedCounter.record(compactionRecord.getCompactionRemovedEventCount(), attributes); + compactionOperationCounter.record(compactionRecord.getCompactionSucceedCount(), + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "success") + .build()); + compactionOperationCounter.record(compactionRecord.getCompactionFailedCount(), + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "failure") + .build()); + compactionDurationSeconds.record(MetricsUtil.convertToSeconds( + compactionRecord.getCompactionDurationTimeInMills(), TimeUnit.MILLISECONDS), attributes); + compactionBytesInCounter.record(compactionRecord.getCompactionReadBytes(), attributes); + compactionBytesOutCounter.record(compactionRecord.getCompactionWriteBytes(), attributes); + + persistentTopic.getCompactedTopicContext().map(CompactedTopicContext::getLedger) + .ifPresent(ledger -> { + compactionEntriesCounter.record(ledger.getLastAddConfirmed() + 1, attributes); + compactionBytesCounter.record(ledger.getLength(), attributes); + }); + }); + + var delayedMessages = topic.getSubscriptions().values().stream() + .map(Subscription::getDispatcher) + .filter(Objects::nonNull) + .mapToLong(Dispatcher::getNumberOfDelayedMessages) + .sum(); + delayedSubscriptionCounter.record(delayedMessages, attributes); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 27288291d2969..e8ab7b095dc3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -25,28 +25,45 @@ import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.lang3.ArrayUtils; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusLabels; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; import org.apache.pulsar.compaction.CompactionRecord; import org.apache.pulsar.compaction.CompactorMXBean; +import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; class TopicStats { + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.SUBSCRIPTION_COUNTER) int subscriptionsCount; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.PRODUCER_COUNTER) int producersCount; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.CONSUMER_COUNTER) int consumersCount; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.MESSAGE_IN_COUNTER) double rateIn; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.MESSAGE_OUT_COUNTER) double rateOut; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.BYTES_IN_COUNTER) double throughputIn; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.BYTES_OUT_COUNTER) double throughputOut; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.MESSAGE_IN_COUNTER) long msgInCounter; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.BYTES_IN_COUNTER) long bytesInCounter; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.MESSAGE_OUT_COUNTER) long msgOutCounter; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.BYTES_OUT_COUNTER) long bytesOutCounter; + @PulsarDeprecatedMetric // Can be derived from MESSAGE_IN_COUNTER and BYTES_IN_COUNTER double averageMsgSize; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.TRANSACTION_COUNTER) long ongoingTxnCount; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.TRANSACTION_COUNTER) long abortedTxnCount; + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.TRANSACTION_COUNTER) long committedTxnCount; public long msgBacklog; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java index 09f9f9b00abab..1d2af6638c33a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java @@ -45,6 +45,8 @@ public class CompactionRecord { private final LongAdder compactionSucceedCount = new LongAdder(); private final LongAdder compactionFailedCount = new LongAdder(); private final LongAdder compactionDurationTimeInMills = new LongAdder(); + private final LongAdder compactionReadBytes = new LongAdder(); + private final LongAdder compactionWriteBytes = new LongAdder(); public final StatsBuckets writeLatencyStats = new StatsBuckets(WRITE_LATENCY_BUCKETS_USEC); public final Rate writeRate = new Rate(); public final Rate readRate = new Rate(); @@ -83,10 +85,12 @@ public void addCompactionEndOp(boolean succeed) { public void addCompactionReadOp(long readableBytes) { readRate.recordEvent(readableBytes); + compactionReadBytes.add(readableBytes); } public void addCompactionWriteOp(long writeableBytes) { writeRate.recordEvent(writeableBytes); + compactionWriteBytes.add(writeableBytes); } public void addCompactionLatencyOp(long latency, TimeUnit unit) { @@ -123,8 +127,16 @@ public double getCompactionReadThroughput() { return readRate.getValueRate(); } + public long getCompactionReadBytes() { + return compactionReadBytes.sum(); + } + public double getCompactionWriteThroughput() { writeRate.calculateRate(); return writeRate.getValueRate(); } + + public long getCompactionWriteBytes() { + return compactionWriteBytes.sum(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 95b0d48c69a6c..eac816bd81089 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -37,6 +37,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -48,6 +49,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.opentelemetry.api.common.Attributes; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -59,6 +61,9 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.api.MessageId; @@ -71,6 +76,7 @@ import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -100,6 +106,12 @@ public void setup() throws Exception { admin.namespaces().createNamespace(myNamespace, Set.of("test")); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + @AfterMethod(alwaysRun = true) @Override public void cleanup() throws Exception { @@ -125,8 +137,18 @@ private void testOffload(String topicName, String mlName) throws Exception { ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName); assertEquals(info.ledgers.size(), 2); - assertEquals(admin.topics().offloadStatus(topicName).getStatus(), - LongRunningProcessStatus.Status.NOT_RUN); + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), LongRunningProcessStatus.Status.NOT_RUN); + var topicNameObject = TopicName.get(topicName); + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicNameObject.getDomain().toString()) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicNameObject.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObject.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicNameObject.getPartitionedTopicName()) + .build(); + // Verify the respective metric is 0 before the offload begins. + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER, + attributes, actual -> assertThat(actual).isZero()); admin.topics().triggerOffload(topicName, currentId); @@ -164,6 +186,11 @@ private void testOffload(String topicName, String mlName) throws Exception { assertEquals(firstUnoffloadedMessage.getEntryId(), 0); verify(offloader, times(2)).offload(any(), any(), any()); + + // Verify the metrics have been updated. + metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER, + attributes, actual -> assertThat(actual).isPositive()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 0bf096fb5d76a..10d56ce2245f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -19,8 +19,10 @@ package org.apache.pulsar.broker.auth; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import com.google.common.collect.Sets; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -739,5 +741,13 @@ protected void reconnectAllConnections() throws Exception { reconnectAllConnections((PulsarClientImpl) pulsarClient); } + protected void assertOtelMetricLongSumValue(String metricName, int value) { + assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics()) + .anySatisfy(metric -> OpenTelemetryAssertions.assertThat(metric) + .hasName(metricName) + .hasLongSumSatisfying( + sum -> sum.hasPointsSatisfying(point -> point.hasValue(value)))); + } + private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index f30b7f12b01eb..6be7023b161f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -21,6 +21,8 @@ import static java.util.Map.entry; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongGaugeValue; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.destination_storage; import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.message_age; import static org.assertj.core.api.Assertions.assertThat; @@ -30,6 +32,8 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.net.URL; import java.time.Duration; import java.util.ArrayList; @@ -45,10 +49,13 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metrics; @@ -70,6 +77,8 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; +import org.apache.pulsar.functions.worker.WorkerConfig; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -94,6 +103,7 @@ public class BacklogQuotaManagerTest { LocalBookkeeperEnsemble bkEnsemble; PrometheusMetricsClient prometheusMetricsClient; + InMemoryMetricReader openTelemetryMetricReader; private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2; private static final int MAX_ENTRIES_PER_LEDGER = 5; @@ -145,7 +155,9 @@ void setup() throws Exception { config.setTopicLevelPoliciesEnabled(true); config.setForceDeleteNamespaceAllowed(true); - pulsar = new PulsarService(config); + openTelemetryMetricReader = InMemoryMetricReader.create(); + pulsar = new PulsarService(config, new WorkerConfig(), Optional.empty(), exitCode -> { + }, BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(openTelemetryMetricReader)); pulsar.start(); adminUrl = new URL("http://127.0.0.1" + ":" + pulsar.getListenPortHTTP().get()); @@ -709,16 +721,17 @@ public void testTriggerBacklogTimeQuotaWithReader() throws Exception { public void testConsumerBacklogEvictionSizeQuota() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), new HashMap<>()); + var backlogSizeLimit = 10 * 1024; admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(10 * 1024) + .limitSize(backlogSizeLimit) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build()); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS) .build(); - final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID(); + final String topic1 = BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic2"); final String subName1 = "c1"; final String subName2 = "c2"; final int numMsgs = 20; @@ -740,6 +753,21 @@ public void testConsumerBacklogEvictionSizeQuota() throws Exception { assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); assertThat(evictionCountMetric("prop/ns-quota", topic1, "size")).isEqualTo(1); assertThat(evictionCountMetric("size")).isEqualTo(1); + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-quota") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic1) + .build(); + var metrics = openTelemetryMetricReader.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.BACKLOG_QUOTA_LIMIT_SIZE, attributes, + backlogSizeLimit); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.BACKLOG_EVICTION_COUNTER, Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "size") + .build(), + 1); } @Test @@ -812,16 +840,17 @@ private long evictionCountMetric(String quotaType) { public void testConsumerBacklogEvictionTimeQuota() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), new HashMap<>()); + var backlogTimeLimit = TIME_TO_CHECK_BACKLOG_QUOTA; admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) + .limitTime(backlogTimeLimit) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(), message_age); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS) .build(); - final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID(); + final String topic1 = BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic3"); final String subName1 = "c1"; final String subName2 = "c2"; final int numMsgs = 14; @@ -844,7 +873,8 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger(); Position slowConsumerReadPos = ml.getSlowestConsumer().getReadPosition(); - Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); + var delaySeconds = backlogTimeLimit * 2; + Thread.sleep(delaySeconds * 1000); rolloverStats(); TopicStats stats2 = getTopicStats(topic1); @@ -856,6 +886,23 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { }); assertEquals(ml.getSlowestConsumer().getReadPosition(), slowConsumerReadPos); + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-quota") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic1) + .build(); + var metrics = openTelemetryMetricReader.collectAllMetrics(); + assertMetricLongGaugeValue(metrics, OpenTelemetryTopicStats.BACKLOG_QUOTA_LIMIT_TIME, attributes, + backlogTimeLimit); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.BACKLOG_EVICTION_COUNTER, Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "time") + .build(), + 1); + assertMetricLongGaugeValue(metrics, OpenTelemetryTopicStats.BACKLOG_QUOTA_AGE, attributes, + value -> assertThat(value).isGreaterThanOrEqualTo(delaySeconds)); } @Test(timeOut = 60000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java index 0d517c014b315..312bfe0fc8ad7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -86,11 +86,11 @@ public void testThrottlingLookupRequestSemaphore() throws Exception { var metricName = BrokerService.TOPIC_LOOKUP_LIMIT_METRIC_NAME; // Validate that the configuration has not been overridden. assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName); - assertLongSumValue(metricName, 50_000); + assertOtelMetricLongSumValue(metricName, 50_000); assertThat(lookupRequestSemaphore.get().availablePermits()).isNotEqualTo(0); admin.brokers().updateDynamicConfiguration(configName, Integer.toString(0)); waitAtMost(1, TimeUnit.SECONDS).until(() -> lookupRequestSemaphore.get().availablePermits() == 0); - assertLongSumValue(metricName, 0); + assertOtelMetricLongSumValue(metricName, 0); } /** @@ -104,19 +104,11 @@ public void testThrottlingTopicLoadRequestSemaphore() throws Exception { var metricName = BrokerService.TOPIC_LOAD_LIMIT_METRIC_NAME; // Validate that the configuration has not been overridden. assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName); - assertLongSumValue(metricName, 5_000); + assertOtelMetricLongSumValue(metricName, 5_000); assertThat(topicLoadRequestSemaphore.get().availablePermits()).isNotEqualTo(0); admin.brokers().updateDynamicConfiguration(configName, Integer.toString(0)); waitAtMost(1, TimeUnit.SECONDS).until(() -> topicLoadRequestSemaphore.get().availablePermits() == 0); - assertLongSumValue(metricName, 0); - } - - private void assertLongSumValue(String metricName, int value) { - assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics()) - .anySatisfy(metric -> assertThat(metric) - .hasName(metricName) - .hasLongSumSatisfying( - sum -> sum.hasPointsSatisfying(point -> point.hasValue(value)))); + assertOtelMetricLongSumValue(metricName, 0); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index ae7edde449631..3ca966d210886 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -36,6 +37,9 @@ import org.apache.bookkeeper.client.BKException; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -45,6 +49,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -69,6 +74,12 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + @Test public void testDelayedDelivery() throws Exception { String topic = BrokerTestUtil.newUniqueName("testNegativeAcks"); @@ -106,6 +117,16 @@ public void testDelayedDelivery() throws Exception { Message msg = sharedConsumer.receive(100, TimeUnit.MILLISECONDS); assertNull(msg); + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "public") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "public/default") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, "persistent://public/default/" + topic) + .build(); + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, + OpenTelemetryTopicStats.DELAYED_SUBSCRIPTION_COUNTER, attributes, 10); + for (int i = 0; i < 10; i++) { msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS); assertEquals(msg.getValue(), "msg-" + i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java new file mode 100644 index 0000000000000..cb61677ab953d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.pulsar.opentelemetry.OpenTelemetryService; + +public class BrokerOpenTelemetryTestUtil { + // Creates an OpenTelemetrySdkBuilder customizer for use in tests. + public static Consumer getOpenTelemetrySdkBuilderConsumer( + InMemoryMetricReader reader) { + return sdkBuilder -> { + sdkBuilder.addMeterProviderCustomizer( + (meterProviderBuilder, __) -> meterProviderBuilder.registerMetricReader(reader)); + sdkBuilder.addPropertiesSupplier( + () -> Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false", + "otel.java.enabled.resource.providers", "none")); + }; + } + + public static void assertMetricDoubleSumValue(Collection metrics, String metricName, + Attributes attributes, Consumer valueConsumer) { + assertThat(metrics) + .anySatisfy(metric -> assertThat(metric) + .hasName(metricName) + .hasDoubleSumSatisfying(sum -> sum.satisfies( + sumData -> assertThat(sumData.getPoints()).anySatisfy( + point -> { + assertThat(point.getAttributes()).isEqualTo(attributes); + valueConsumer.accept(point.getValue()); + })))); + } + + public static void assertMetricLongSumValue(Collection metrics, String metricName, + Attributes attributes, long expected) { + assertMetricLongSumValue(metrics, metricName, attributes, actual -> assertThat(actual).isEqualTo(expected)); + } + + public static void assertMetricLongSumValue(Collection metrics, String metricName, + Attributes attributes, Consumer valueConsumer) { + assertThat(metrics) + .anySatisfy(metric -> assertThat(metric) + .hasName(metricName) + .hasLongSumSatisfying(sum -> sum.satisfies( + sumData -> assertThat(sumData.getPoints()).anySatisfy( + point -> { + assertThat(point.getAttributes()).isEqualTo(attributes); + valueConsumer.accept(point.getValue()); + })))); + } + + public static void assertMetricLongGaugeValue(Collection metrics, String metricName, + Attributes attributes, long expected) { + assertMetricLongGaugeValue(metrics, metricName, attributes, actual -> assertThat(actual).isEqualTo(expected)); + } + + public static void assertMetricLongGaugeValue(Collection metrics, String metricName, + Attributes attributes, Consumer valueConsumer) { + assertThat(metrics) + .anySatisfy(metric -> assertThat(metric) + .hasName(metricName) + .hasLongGaugeSatisfying(gauge -> gauge.satisfies( + pointData -> assertThat(pointData.getPoints()).anySatisfy( + point -> { + assertThat(point.getAttributes()).isEqualTo(attributes); + valueConsumer.accept(point.getValue()); + })))); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.java new file mode 100644 index 0000000000000..c6d07c018c806 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.common.Attributes; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryTopicStatsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + @Test(timeOut = 30_000) + public void testMessagingMetrics() throws Exception { + var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testMessagingMetrics"); + admin.topics().createNonPartitionedTopic(topicName); + + var producerCount = 5; + var messagesPerProducer = 2; + var consumerCount = 3; + var messageCount = producerCount * messagesPerProducer; + + for (int i = 0; i < producerCount; i++) { + var producer = registerCloseable(pulsarClient.newProducer().topic(topicName).create()); + for (int j = 0; j < messagesPerProducer; j++) { + producer.send(String.format("producer-%d-msg-%d", i, j).getBytes()); + } + } + + var cdl = new CountDownLatch(consumerCount); + for (int i = 0; i < consumerCount; i++) { + var consumer = registerCloseable(pulsarClient.newConsumer().topic(topicName) + .subscriptionName("test") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Shared) + .subscribe()); + consumer.receiveAsync().orTimeout(100, TimeUnit.MILLISECONDS).handle((__, ex) -> { + cdl.countDown(); + return null; + }); + } + cdl.await(); + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName) + .build(); + + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.SUBSCRIPTION_COUNTER, attributes, 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.PRODUCER_COUNTER, attributes, producerCount); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.CONSUMER_COUNTER, attributes, consumerCount); + + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.MESSAGE_IN_COUNTER, attributes, messageCount); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.MESSAGE_OUT_COUNTER, attributes, messageCount); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.BYTES_IN_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.BYTES_OUT_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_LOGICAL_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_BACKLOG_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_OUT_COUNTER, attributes, messageCount); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_IN_COUNTER, attributes, messageCount); + } + + @Test(timeOut = 30_000) + public void testPublishRateLimitMetric() throws Exception { + var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testPublishRateLimitMetric"); + admin.topics().createNonPartitionedTopic(topicName); + + var publishRate = new PublishRate(1, -1); + admin.topicPolicies().setPublishRate(topicName, publishRate); + Awaitility.await().until(() -> Objects.equals(publishRate, admin.topicPolicies().getPublishRate(topicName))); + + @Cleanup + var producer = pulsarClient.newProducer().topic(topicName).create(); + producer.send("msg".getBytes()); + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName) + .build(); + + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.PUBLISH_RATE_LIMIT_HIT_COUNTER, attributes, 1); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index 13209ccfce7d3..dceb18cbeaa9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -28,7 +28,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -55,6 +54,7 @@ import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; import org.apache.pulsar.common.util.PortManager; @@ -67,7 +67,6 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; -import org.apache.pulsar.opentelemetry.OpenTelemetryService; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.MockZooKeeperSession; @@ -746,13 +745,8 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) { Consumer openTelemetrySdkBuilderCustomizer; if (builder.enableOpenTelemetry) { var reader = InMemoryMetricReader.create(); - openTelemetrySdkBuilderCustomizer = sdkBuilder -> { - sdkBuilder.addMeterProviderCustomizer( - (meterProviderBuilder, __) -> meterProviderBuilder.registerMetricReader(reader)); - sdkBuilder.addPropertiesSupplier( - () -> Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false")); - }; openTelemetryMetricReader(reader); + openTelemetrySdkBuilderCustomizer = BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader); } else { openTelemetrySdkBuilderCustomizer = null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index e45924e8bb4f2..ed1b74c46e0f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -40,6 +40,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.concurrent.DefaultThreadFactory; +import io.opentelemetry.api.common.Attributes; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; @@ -79,6 +80,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.reflect.MethodUtils; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; @@ -91,6 +93,8 @@ import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; @@ -142,6 +146,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; @@ -182,7 +187,7 @@ protected void cleanup() throws Exception { @Test public void testTopicTransactionMetrics() throws Exception { - final String topic = "persistent://tnx/ns1/test_transaction_topic"; + final String topic = BrokerTestUtil.newUniqueName("persistent://tnx/ns1/test_transaction_topic"); @Cleanup Producer producer = this.pulsarClient.newProducer() @@ -216,6 +221,33 @@ public void testTopicTransactionMetrics() throws Exception { assertEquals(stats.committedTxnCount, 1); assertEquals(stats.abortedTxnCount, 1); assertEquals(stats.ongoingTxnCount, 1); + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic) + .build(); + + var metrics = pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed") + .build(), + 1); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted") + .build(), + 1); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.TRANSACTION_COUNTER, + Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "active") + .build(), + 1); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 1ff835732aab5..4ab886492a4eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -169,6 +169,7 @@ protected void startBroker() throws Exception { PulsarTestContext.builder() .brokerInterceptor(new CounterBrokerInterceptor()) .spyByDefault() + .enableOpenTelemetry(true) .config(conf); if (i > 0) { testContextBuilder.reuseMockBookkeeperAndMetadataStores(pulsarTestContexts.get(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 7a527a16889e0..2d2019b38eddf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -189,6 +189,9 @@ public void testMultipleBrokerLookup() throws Exception { doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class)); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); + // Disable collecting topic stats during this test, as it deadlocks on access to map BrokerService.topics. + pulsar2.getOpenTelemetryTopicStats().close(); + var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); var lookupRequestSemaphoreField = BrokerService.class.getDeclaredField("lookupRequestSemaphore"); lookupRequestSemaphoreField.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 71700ef83a443..debc3dd5e3f98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -18,13 +18,17 @@ */ package org.apache.pulsar.compaction; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleSumValue; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.client.impl.RawReaderTest.extractKey; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; +import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -37,7 +41,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -45,9 +48,12 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -63,6 +69,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; @@ -106,6 +113,12 @@ public void cleanup() throws Exception { compactionScheduler.shutdownNow(); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + protected long compact(String topic) throws ExecutionException, InterruptedException { return compactor.compact(topic).get(); } @@ -186,7 +199,7 @@ public void testCompaction() throws Exception { @Test public void testAllCompactedOut() throws Exception { - String topicName = "persistent://my-property/use/my-ns/testAllCompactedOut"; + String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/use/my-ns/testAllCompactedOut"); // set retain null key to true boolean oldRetainNullKey = pulsar.getConfig().isTopicCompactionRetainNullKey(); pulsar.getConfig().setTopicCompactionRetainNullKey(true); @@ -208,6 +221,34 @@ public void testAllCompactedOut() throws Exception { LongRunningProcessStatus.Status.SUCCESS); }); + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "my-property") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "my-property/use/my-ns") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName) + .build(); + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_REMOVED_COUNTER, attributes, 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "success") + .build(), + 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "failure") + .build(), + 0); + assertMetricDoubleSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_DURATION_SECONDS, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_BYTES_IN_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_BYTES_OUT_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_ENTRIES_COUNTER, attributes, 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_BYTES_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + producer.newMessage().key("K1").value(null).sendAsync(); producer.flush(); diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index bdb002cb359ff..6088f52f72c61 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -29,4 +29,44 @@ public interface OpenTelemetryAttributes { * {@link OpenTelemetryService}. */ AttributeKey PULSAR_CLUSTER = AttributeKey.stringKey("pulsar.cluster"); + + /** + * The name of the Pulsar namespace. + */ + AttributeKey PULSAR_NAMESPACE = AttributeKey.stringKey("pulsar.namespace"); + + /** + * The name of the Pulsar tenant. + */ + AttributeKey PULSAR_TENANT = AttributeKey.stringKey("pulsar.tenant"); + + /** + * The Pulsar topic domain. + */ + AttributeKey PULSAR_DOMAIN = AttributeKey.stringKey("pulsar.domain"); + + /** + * The name of the Pulsar topic. + */ + AttributeKey PULSAR_TOPIC = AttributeKey.stringKey("pulsar.topic"); + + /** + * The partition index of a Pulsar topic. + */ + AttributeKey PULSAR_PARTITION_INDEX = AttributeKey.longKey("pulsar.partition.index"); + + /** + * The status of the Pulsar transaction. + */ + AttributeKey PULSAR_TRANSACTION_STATUS = AttributeKey.stringKey("pulsar.transaction.status"); + + /** + * The status of the Pulsar compaction operation. + */ + AttributeKey PULSAR_COMPACTION_STATUS = AttributeKey.stringKey("pulsar.compaction.status"); + + /** + * The type of the backlog quota. + */ + AttributeKey PULSAR_BACKLOG_QUOTA_TYPE = AttributeKey.stringKey("pulsar.backlog.quota.type"); }