Skip to content

Commit

Permalink
Expose metrics for pending acquire operation latency
Browse files Browse the repository at this point in the history
This is related to reactor/reactor-netty#2946
  • Loading branch information
violetagg committed Nov 16, 2023
1 parent 013443b commit cc923dd
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,8 @@ final class MicrometerMetricsRecorder implements PoolMetricsRecorder {
private final Timer resetMeter;
private final Timer resourceSummaryIdleness;
private final Timer resourceSummaryLifetime;
private final Timer pendingSuccessTimer;
private final Timer pendingFailureTimer;

MicrometerMetricsRecorder(String poolName, MeterRegistry registry) {
this.poolName = poolName;
Expand All @@ -70,6 +72,11 @@ final class MicrometerMetricsRecorder implements PoolMetricsRecorder {

resourceSummaryLifetime = this.meterRegistry.timer(SUMMARY_LIFETIME.getName(), nameTag);
resourceSummaryIdleness = this.meterRegistry.timer(SUMMARY_IDLENESS.getName(), nameTag);

pendingSuccessTimer = this.meterRegistry.timer(PENDING.getName(),
nameTag.and(PendingTags.OUTCOME_SUCCESS));
pendingFailureTimer = this.meterRegistry.timer(PENDING.getName(),
nameTag.and(PendingTags.OUTCOME_FAILURE));
}

@Override
Expand Down Expand Up @@ -116,4 +123,14 @@ public void recordSlowPath() {
public void recordFastPath() {
recycledNotableFastPathCounter.increment();
}

@Override
public void recordPendingSuccessAndLatency(long latencyMs) {
pendingSuccessTimer.record(latencyMs, TimeUnit.MILLISECONDS);
}

@Override
public void recordPendingFailureAndLatency(long latencyMs) {
pendingFailureTimer.record(latencyMs, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
* Meters used by {@link Micrometer} utility.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
enum PoolMetersDocumentation implements MeterDocumentation {

Expand Down Expand Up @@ -210,6 +211,23 @@ public Meter.Type getType() {
public KeyName[] getKeyNames() {
return CommonTags.values();
}
},

PENDING {
@Override
public String getName() {
return "reactor.pool.pending";
}

@Override
public Meter.Type getType() {
return Meter.Type.TIMER;
}

@Override
public KeyName[] getKeyNames() {
return KeyName.merge(CommonTags.values(), PendingTags.values());
}
};

public enum AllocationTags implements KeyName {
Expand Down Expand Up @@ -257,4 +275,23 @@ public String asString() {
}
}
}

public enum PendingTags implements KeyName {

/**
* Indicates whether the pending acquire operation finished with a {@code success} or {@code failure} i.e.
* whether an allocation operation was triggered or a {@link reactor.pool.PoolAcquireTimeoutException}
* was thrown.
*/
OUTCOME {
@Override
public String asString() {
return "pool.pending.outcome";
}
};

public static final Tag OUTCOME_SUCCESS = Tag.of(OUTCOME.asString(), "success");
public static final Tag OUTCOME_FAILURE = Tag.of(OUTCOME.asString(), "failure");

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,7 +74,9 @@ void metersRegisteredForRecorder() {
"reactor.pool.recycled.notable[tag(pool.name=testRecorder), tag(pool.recycling.path=slow)]",
"reactor.pool.reset[tag(pool.name=testRecorder)]",
"reactor.pool.resources.summary.lifetime[tag(pool.name=testRecorder)]",
"reactor.pool.resources.summary.idleness[tag(pool.name=testRecorder)]"
"reactor.pool.resources.summary.idleness[tag(pool.name=testRecorder)]",
"reactor.pool.pending[tag(pool.name=testRecorder), tag(pool.pending.outcome=success)]",
"reactor.pool.pending[tag(pool.name=testRecorder), tag(pool.pending.outcome=failure)]"
);
}

Expand Down Expand Up @@ -104,7 +106,9 @@ void micrometerInstrumentedPoolRegistersGaugeAndRecorderMetrics() {
"reactor.pool.recycled.notable[tag(pool.name=testMetrics), tag(pool.recycling.path=slow)]",
"reactor.pool.reset[tag(pool.name=testMetrics)]",
"reactor.pool.resources.summary.lifetime[tag(pool.name=testMetrics)]",
"reactor.pool.resources.summary.idleness[tag(pool.name=testMetrics)]"
"reactor.pool.resources.summary.idleness[tag(pool.name=testMetrics)]",
"reactor.pool.pending[tag(pool.name=testMetrics), tag(pool.pending.outcome=success)]",
"reactor.pool.pending[tag(pool.name=testMetrics), tag(pool.pending.outcome=failure)]"
);
}
}
25 changes: 19 additions & 6 deletions reactor-pool/src/main/java/reactor/pool/AbstractPool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@
* related classes like {@link AbstractPooledRef} or {@link Borrower}.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
abstract class AbstractPool<POOLABLE> implements InstrumentedPool<POOLABLE>,
InstrumentedPool.PoolMetrics {
Expand Down Expand Up @@ -125,7 +126,7 @@ public boolean isInactiveForMoreThan(Duration duration) {

/**
* Note to implementors: stop the {@link Borrower} countdown by calling
* {@link Borrower#stopPendingCountdown()} as soon as it is known that a resource is
* {@link Borrower#stopPendingCountdown(boolean)} as soon as it is known that a resource is
* available or is in the process of being allocated.
*/
abstract void doAcquire(Borrower<POOLABLE> borrower);
Expand Down Expand Up @@ -382,6 +383,7 @@ public String toString() {
* an {@link AbstractPool}.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable, Subscription, Runnable {

Expand All @@ -391,6 +393,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
final AbstractPool<POOLABLE> pool;
final Duration pendingAcquireTimeout;

long pendingAcquireStart;
Disposable timeoutTask;

Borrower(CoreSubscriber<? super AbstractPooledRef<POOLABLE>> actual,
Expand All @@ -409,6 +412,8 @@ Context currentContext() {
@Override
public void run() {
if (Borrower.this.compareAndSet(false, true)) {
// this is failure, a timeout was observed
pool.metricsRecorder.recordPendingFailureAndLatency(pool.clock.millis() - pendingAcquireStart);
pool.cancelAcquire(Borrower.this);
actual.onError(new PoolAcquireTimeoutException(pendingAcquireTimeout));
}
Expand All @@ -423,6 +428,7 @@ public void request(long n) {
boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0;

if (!pendingAcquireTimeout.isZero() && noIdle && noPermits) {
pendingAcquireStart = pool.clock.millis();
timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
}
//doAcquire should interrupt the countdown if there is either an available
Expand All @@ -434,15 +440,22 @@ public void request(long n) {
/**
* Stop the countdown started when calling {@link AbstractPool#doAcquire(Borrower)}.
*/
void stopPendingCountdown() {
void stopPendingCountdown(boolean success) {
if (!timeoutTask.isDisposed()) {
if (success) {
pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
} else {
pool.metricsRecorder.recordPendingFailureAndLatency(pool.clock.millis() - pendingAcquireStart);
}
}
timeoutTask.dispose();
}

@Override
public void cancel() {
set(true);
pool.cancelAcquire(this);
stopPendingCountdown();
stopPendingCountdown(true); // this is not failure, the subscription was canceled
}

@Override
Expand All @@ -457,7 +470,7 @@ public Object scanUnsafe(Attr key) {
}

void deliver(AbstractPooledRef<POOLABLE> poolSlot) {
stopPendingCountdown();
stopPendingCountdown(true);
if (get()) {
//CANCELLED
poolSlot.release().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty())); //actual mustn't receive onError
Expand All @@ -470,7 +483,7 @@ void deliver(AbstractPooledRef<POOLABLE> poolSlot) {
}

void fail(Throwable error) {
stopPendingCountdown();
stopPendingCountdown(false);
if (!get()) {
actual.onError(error);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
* A No-Op {@link PoolMetricsRecorder} that can be used as a default if instrumentation is not desired.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
final class NoOpPoolMetricsRecorder implements PoolMetricsRecorder {

Expand Down Expand Up @@ -73,4 +74,14 @@ public void recordSlowPath() {
public void recordFastPath() {

}

@Override
public void recordPendingSuccessAndLatency(long latencyMs) {

}

@Override
public void recordPendingFailureAndLatency(long latencyMs) {

}
}
25 changes: 24 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/PoolMetricsRecorder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
* responsibility of a {@link java.time.Clock}.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
public interface PoolMetricsRecorder {

Expand Down Expand Up @@ -77,4 +78,26 @@ public interface PoolMetricsRecorder {
* Record the fact that a {@link Pool} has a fast path of recycling and just used it.
*/
void recordFastPath();

/**
* Record a latency for successful pending acquire operation.
* A successful pending acquire operation is such that triggers an allocation operation.
* Implies incrementing a pending acquire success counter as well.
* @param latencyMs the latency in milliseconds
* @since 1.0.4
*/
default void recordPendingSuccessAndLatency(long latencyMs) {
// noop
}

/**
* Record a latency for failed pending acquire.
* A failed pending acquire operation is such that finishes with {@link PoolAcquireTimeoutException}.
* Implies incrementing a pending acquire failure counter as well.
* @param latencyMs the latency in milliseconds
* @since 1.0.4
*/
default void recordPendingFailureAndLatency(long latencyMs) {
// noop
}
}
4 changes: 2 additions & 2 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private void drainLoop() {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
borrower.stopPendingCountdown(true);
ACQUIRED.incrementAndGet(this);
poolConfig.acquisitionScheduler()
.schedule(() -> borrower.deliver(slot));
Expand Down Expand Up @@ -412,7 +412,7 @@ private void drainLoop() {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
borrower.stopPendingCountdown(true);
long start = clock.millis();
Mono<POOLABLE> allocator = allocatorWithScheduler();

Expand Down
47 changes: 47 additions & 0 deletions reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

/**
* @author Simon Baslé
* @author Violeta Georgieva
*/
public class CommonPoolTest {

Expand Down Expand Up @@ -2680,4 +2681,50 @@ void testIssue_174(PoolStyle style) {
assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(0);
assertThat(pool.metrics().idleSize()).isEqualTo(10);
}

@ParameterizedTestWithName
@MethodSource("allPools")
@Tag("metrics")
void recordsPendingCountAndLatencies(PoolStyle configAdjuster) {
PoolBuilder<String, ?> builder = PoolBuilder
.from(Mono.defer(() -> Mono.just("foo")))
.metricsRecorder(recorder)
.sizeBetween(0, 1)
.clock(recorder.getClock());
Pool<String> pool = configAdjuster.apply(builder);

PooledRef<String> pooledRef = pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1)); //success
assertThat(pooledRef).isNotNull();

pool.acquire(Duration.ofMillis(50)).subscribe(); //success

pool.acquire(Duration.ofMillis(1))
.as(StepVerifier::create)
.expectError(PoolAcquireTimeoutException.class)
.verify(Duration.ofSeconds(1)); //error

pooledRef.release().block(Duration.ofSeconds(1));

assertThat(recorder.getPendingTotalCount())
.as("total pending")
.isEqualTo(2);
assertThat(recorder.getPendingSuccessCount())
.as("pending success")
.isEqualTo(1)
.isEqualTo(recorder.getPendingSuccessHistogram().getTotalCount());
assertThat(recorder.getPendingErrorCount())
.as("pending errors")
.isEqualTo(1)
.isEqualTo(recorder.getPendingErrorHistogram().getTotalCount());

long minSuccess = recorder.getPendingSuccessHistogram().getMinValue();
assertThat(minSuccess)
.as("pending success latency")
.isGreaterThanOrEqualTo(1L);

long minError = recorder.getPendingErrorHistogram().getMinValue();
assertThat(minError)
.as("pending error latency")
.isGreaterThanOrEqualTo(1L);
}
}
Loading

0 comments on commit cc923dd

Please sign in to comment.