Skip to content

Commit

Permalink
[feat][broker] PIP-264: Add topic messaging metrics (apache#22467)
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor authored May 1, 2024
1 parent a904863 commit 4f3cc6c
Show file tree
Hide file tree
Showing 21 changed files with 1,039 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntrySucceed();

/**
* @return the total number of addEntry requests that succeeded
*/
long getAddEntrySucceedTotal();

/**
* @return the number of addEntry requests that failed
*/
Expand All @@ -100,6 +105,11 @@ public interface ManagedLedgerMXBean {
*/
long getReadEntriesSucceeded();

/**
* @return the total number of readEntries requests that succeeded
*/
long getReadEntriesSucceededTotal();

/**
* @return the number of readEntries requests that failed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ public long getAddEntrySucceed() {
return addEntryOps.getCount();
}

@Override
public long getAddEntrySucceedTotal() {
return addEntryOps.getTotalCount();
}

@Override
public long getAddEntryErrors() {
return addEntryOpsFailed.getCount();
Expand All @@ -240,6 +245,11 @@ public long getReadEntriesSucceeded() {
return readEntriesOps.getCount();
}

@Override
public long getReadEntriesSucceededTotal() {
return readEntriesOps.getTotalCount();
}

@Override
public long getReadEntriesErrors() {
return readEntriesOpsFailed.getCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ public void simple() throws Exception {
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 0.0);
assertEquals(mbean.getAddEntryMessagesRate(), 0.0);
assertEquals(mbean.getAddEntrySucceed(), 0);
assertEquals(mbean.getAddEntrySucceedTotal(), 0);
assertEquals(mbean.getAddEntryErrors(), 0);
assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
assertEquals(mbean.getReadEntriesRate(), 0.0);
assertEquals(mbean.getReadEntriesSucceeded(), 0);
assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertEquals(mbean.getMarkDeleteRate(), 0.0);

Expand All @@ -105,10 +107,12 @@ public void simple() throws Exception {
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 1600.0);
assertEquals(mbean.getAddEntryMessagesRate(), 2.0);
assertEquals(mbean.getAddEntrySucceed(), 2);
assertEquals(mbean.getAddEntrySucceedTotal(), 2);
assertEquals(mbean.getAddEntryErrors(), 0);
assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
assertEquals(mbean.getReadEntriesRate(), 0.0);
assertEquals(mbean.getReadEntriesSucceeded(), 0);
assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertTrue(mbean.getMarkDeleteRate() > 0.0);

Expand All @@ -134,10 +138,14 @@ public void simple() throws Exception {
assertEquals(mbean.getReadEntriesBytesRate(), 600.0);
assertEquals(mbean.getReadEntriesRate(), 1.0);
assertEquals(mbean.getReadEntriesSucceeded(), 1);
assertEquals(mbean.getReadEntriesSucceededTotal(), 1);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertEquals(mbean.getNumberOfMessagesInBacklog(), 1);
assertEquals(mbean.getMarkDeleteRate(), 0.0);

assertEquals(mbean.getAddEntrySucceed(), 0);
assertEquals(mbean.getAddEntrySucceedTotal(), 2);

factory.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
Expand Down Expand Up @@ -252,6 +253,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
private OpenTelemetryTopicStats openTelemetryTopicStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -631,6 +633,10 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

if (openTelemetryTopicStats != null) {
openTelemetryTopicStats.close();
}

asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));


Expand Down Expand Up @@ -771,6 +777,8 @@ public void start() throws PulsarServerException {
config.getDefaultRetentionTimeInMinutes() * 60));
}

openTelemetryTopicStats = new OpenTelemetryTopicStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
protected volatile long publishRateLimitedTimes = 0L;
private static final AtomicLongFieldUpdater<AbstractTopic> TOTAL_RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "totalPublishRateLimitedCounter");
protected volatile long totalPublishRateLimitedCounter = 0L;

private static final AtomicIntegerFieldUpdater<AbstractTopic> USER_CREATED_PRODUCER_COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount");
Expand Down Expand Up @@ -897,6 +900,7 @@ public void recordAddLatency(long latency, TimeUnit unit) {

@Override
public long increasePublishLimitedTimes() {
TOTAL_RATE_LIMITED_UPDATER.incrementAndGet(this);
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}

Expand Down Expand Up @@ -1185,6 +1189,10 @@ public long getBytesOutCounter() {
+ sumSubscriptions(AbstractSubscription::getBytesOutCounter);
}

public long getTotalPublishRateLimitCounter() {
return TOTAL_RATE_LIMITED_UPDATER.get(this);
}

private long sumSubscriptions(ToLongFunction<AbstractSubscription> toCounter) {
return getSubscriptions().values().stream()
.map(AbstractSubscription.class::cast)
Expand Down
Loading

0 comments on commit 4f3cc6c

Please sign in to comment.