Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose metrics for pending acquire operation latency #178

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,8 @@ final class MicrometerMetricsRecorder implements PoolMetricsRecorder {
private final Timer resetMeter;
private final Timer resourceSummaryIdleness;
private final Timer resourceSummaryLifetime;
private final Timer pendingSuccessTimer;
private final Timer pendingFailureTimer;

MicrometerMetricsRecorder(String poolName, MeterRegistry registry) {
this.poolName = poolName;
Expand All @@ -70,6 +72,11 @@ final class MicrometerMetricsRecorder implements PoolMetricsRecorder {

resourceSummaryLifetime = this.meterRegistry.timer(SUMMARY_LIFETIME.getName(), nameTag);
resourceSummaryIdleness = this.meterRegistry.timer(SUMMARY_IDLENESS.getName(), nameTag);

pendingSuccessTimer = this.meterRegistry.timer(PENDING.getName(),
nameTag.and(PendingTags.OUTCOME_SUCCESS));
pendingFailureTimer = this.meterRegistry.timer(PENDING.getName(),
nameTag.and(PendingTags.OUTCOME_FAILURE));
}

@Override
Expand Down Expand Up @@ -116,4 +123,14 @@ public void recordSlowPath() {
public void recordFastPath() {
recycledNotableFastPathCounter.increment();
}

@Override
public void recordPendingSuccessAndLatency(long latencyMs) {
pendingSuccessTimer.record(latencyMs, TimeUnit.MILLISECONDS);
}

@Override
public void recordPendingFailureAndLatency(long latencyMs) {
pendingFailureTimer.record(latencyMs, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
* Meters used by {@link Micrometer} utility.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
enum PoolMetersDocumentation implements MeterDocumentation {

Expand Down Expand Up @@ -210,6 +211,23 @@ public Meter.Type getType() {
public KeyName[] getKeyNames() {
return CommonTags.values();
}
},

PENDING {
@Override
public String getName() {
return "reactor.pool.pending";
}

@Override
public Meter.Type getType() {
return Meter.Type.TIMER;
}

@Override
public KeyName[] getKeyNames() {
return KeyName.merge(CommonTags.values(), PendingTags.values());
}
};

public enum AllocationTags implements KeyName {
Expand Down Expand Up @@ -257,4 +275,23 @@ public String asString() {
}
}
}

public enum PendingTags implements KeyName {

/**
* Indicates whether the pending acquire operation finished with a {@code success} or {@code failure} i.e.
* whether an allocation operation was triggered or a {@link reactor.pool.PoolAcquireTimeoutException}
* was thrown.
*/
OUTCOME {
@Override
public String asString() {
return "pool.pending.outcome";
}
};

public static final Tag OUTCOME_SUCCESS = Tag.of(OUTCOME.asString(), "success");
public static final Tag OUTCOME_FAILURE = Tag.of(OUTCOME.asString(), "failure");

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,7 +74,9 @@ void metersRegisteredForRecorder() {
"reactor.pool.recycled.notable[tag(pool.name=testRecorder), tag(pool.recycling.path=slow)]",
"reactor.pool.reset[tag(pool.name=testRecorder)]",
"reactor.pool.resources.summary.lifetime[tag(pool.name=testRecorder)]",
"reactor.pool.resources.summary.idleness[tag(pool.name=testRecorder)]"
"reactor.pool.resources.summary.idleness[tag(pool.name=testRecorder)]",
"reactor.pool.pending[tag(pool.name=testRecorder), tag(pool.pending.outcome=success)]",
"reactor.pool.pending[tag(pool.name=testRecorder), tag(pool.pending.outcome=failure)]"
);
}

Expand Down Expand Up @@ -104,7 +106,9 @@ void micrometerInstrumentedPoolRegistersGaugeAndRecorderMetrics() {
"reactor.pool.recycled.notable[tag(pool.name=testMetrics), tag(pool.recycling.path=slow)]",
"reactor.pool.reset[tag(pool.name=testMetrics)]",
"reactor.pool.resources.summary.lifetime[tag(pool.name=testMetrics)]",
"reactor.pool.resources.summary.idleness[tag(pool.name=testMetrics)]"
"reactor.pool.resources.summary.idleness[tag(pool.name=testMetrics)]",
"reactor.pool.pending[tag(pool.name=testMetrics), tag(pool.pending.outcome=success)]",
"reactor.pool.pending[tag(pool.name=testMetrics), tag(pool.pending.outcome=failure)]"
);
}
}
25 changes: 19 additions & 6 deletions reactor-pool/src/main/java/reactor/pool/AbstractPool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@
* related classes like {@link AbstractPooledRef} or {@link Borrower}.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
abstract class AbstractPool<POOLABLE> implements InstrumentedPool<POOLABLE>,
InstrumentedPool.PoolMetrics {
Expand Down Expand Up @@ -125,7 +126,7 @@ public boolean isInactiveForMoreThan(Duration duration) {

/**
* Note to implementors: stop the {@link Borrower} countdown by calling
* {@link Borrower#stopPendingCountdown()} as soon as it is known that a resource is
* {@link Borrower#stopPendingCountdown(boolean)} as soon as it is known that a resource is
* available or is in the process of being allocated.
*/
abstract void doAcquire(Borrower<POOLABLE> borrower);
Expand Down Expand Up @@ -382,6 +383,7 @@ public String toString() {
* an {@link AbstractPool}.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable, Subscription, Runnable {

Expand All @@ -391,6 +393,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
final AbstractPool<POOLABLE> pool;
final Duration pendingAcquireTimeout;

long pendingAcquireStart;
Disposable timeoutTask;

Borrower(CoreSubscriber<? super AbstractPooledRef<POOLABLE>> actual,
Expand All @@ -409,6 +412,8 @@ Context currentContext() {
@Override
public void run() {
if (Borrower.this.compareAndSet(false, true)) {
// this is failure, a timeout was observed
stopPendingCountdown(false);
pool.cancelAcquire(Borrower.this);
actual.onError(new PoolAcquireTimeoutException(pendingAcquireTimeout));
}
Expand All @@ -423,6 +428,7 @@ public void request(long n) {
boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0;

if (!pendingAcquireTimeout.isZero() && noIdle && noPermits) {
pendingAcquireStart = pool.clock.millis();
timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
}
//doAcquire should interrupt the countdown if there is either an available
Expand All @@ -434,15 +440,22 @@ public void request(long n) {
/**
* Stop the countdown started when calling {@link AbstractPool#doAcquire(Borrower)}.
*/
void stopPendingCountdown() {
void stopPendingCountdown(boolean success) {
if (!timeoutTask.isDisposed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming timeoutTask was disposed, what should happen with the pending metric?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the pending metric per connection pool?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to understand - so if a borrower was just timed out, its result can be ignored, while the acquisition will actually account for recording the metric. Do I understand it right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!timeoutTask.isDisposed() this check is to ensure that if you invoke this method twice for one and the same Borrower you will not record twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean timeoutTask is disposed here in this method at the end of the method, so we need to guarantee that we record just once.

Copy link
Member Author

@violetagg violetagg Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task timed out the recording will happen in run method. Is that what you had in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now see what is happening. In another comment I expressed my concern about it. Now my question would be - is there no race here? Is the timeoutTask delivered serially with the acquisition or does it run on potentially another thread? In such case, perhaps the flip of the Borrower's AtomicBoolean state would be the trigger to account for either success or failure in the transitioning from pending state?

if (success) {
pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
} else {
pool.metricsRecorder.recordPendingFailureAndLatency(pool.clock.millis() - pendingAcquireStart);
}
}
timeoutTask.dispose();
}

@Override
public void cancel() {
set(true);
pool.cancelAcquire(this);
stopPendingCountdown();
stopPendingCountdown(true); // this is not failure, the subscription was canceled
}

@Override
Expand All @@ -457,7 +470,7 @@ public Object scanUnsafe(Attr key) {
}

void deliver(AbstractPooledRef<POOLABLE> poolSlot) {
stopPendingCountdown();
stopPendingCountdown(true);
if (get()) {
//CANCELLED
poolSlot.release().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty())); //actual mustn't receive onError
Expand All @@ -470,7 +483,7 @@ void deliver(AbstractPooledRef<POOLABLE> poolSlot) {
}

void fail(Throwable error) {
stopPendingCountdown();
stopPendingCountdown(false);
if (!get()) {
actual.onError(error);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
* A No-Op {@link PoolMetricsRecorder} that can be used as a default if instrumentation is not desired.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
final class NoOpPoolMetricsRecorder implements PoolMetricsRecorder {

Expand Down Expand Up @@ -73,4 +74,14 @@ public void recordSlowPath() {
public void recordFastPath() {

}

@Override
public void recordPendingSuccessAndLatency(long latencyMs) {

}

@Override
public void recordPendingFailureAndLatency(long latencyMs) {

}
}
25 changes: 24 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/PoolMetricsRecorder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
* responsibility of a {@link java.time.Clock}.
*
* @author Simon Baslé
* @author Violeta Georgieva
*/
public interface PoolMetricsRecorder {

Expand Down Expand Up @@ -77,4 +78,26 @@ public interface PoolMetricsRecorder {
* Record the fact that a {@link Pool} has a fast path of recycling and just used it.
*/
void recordFastPath();

/**
* Record a latency for successful pending acquire operation.
* A successful pending acquire operation is such that triggers an allocation operation.
* Implies incrementing a pending acquire success counter as well.
* @param latencyMs the latency in milliseconds
* @since 1.0.4
*/
default void recordPendingSuccessAndLatency(long latencyMs) {
// noop
}

/**
* Record a latency for failed pending acquire.
* A failed pending acquire operation is such that finishes with {@link PoolAcquireTimeoutException}.
* Implies incrementing a pending acquire failure counter as well.
* @param latencyMs the latency in milliseconds
* @since 1.0.4
*/
default void recordPendingFailureAndLatency(long latencyMs) {
// noop
}
}
4 changes: 2 additions & 2 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private void drainLoop() {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
borrower.stopPendingCountdown(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually followed by borrower.deliver which calls stopPendingCountdown. Why not rely on the one in deliver then? Is it because of the re-scheduling onto the acquisitionScheduler? Perhaps there should be an abstraction for signalling that pending state is over and what the next step is so that it is encapsulated? It feels unwieldy that the logic is spread over two classes and feels error-prone. Shouldn't this actually cancel the timeout timer instead?

ACQUIRED.incrementAndGet(this);
poolConfig.acquisitionScheduler()
.schedule(() -> borrower.deliver(slot));
Expand Down Expand Up @@ -412,7 +412,7 @@ private void drainLoop() {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
borrower.stopPendingCountdown(true);
long start = clock.millis();
Mono<POOLABLE> allocator = allocatorWithScheduler();

Expand Down
50 changes: 50 additions & 0 deletions reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

/**
* @author Simon Baslé
* @author Violeta Georgieva
*/
public class CommonPoolTest {

Expand Down Expand Up @@ -2680,4 +2681,53 @@ void testIssue_174(PoolStyle style) {
assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(0);
assertThat(pool.metrics().idleSize()).isEqualTo(10);
}

@ParameterizedTestWithName
@MethodSource("allPools")
@Tag("metrics")
void recordsPendingCountAndLatencies(PoolStyle configAdjuster) {
PoolBuilder<String, ?> builder = PoolBuilder
.from(Mono.defer(() -> Mono.just("foo")))
.metricsRecorder(recorder)
.sizeBetween(0, 1)
.clock(recorder.getClock());
Pool<String> pool = configAdjuster.apply(builder);

//success, acquisition happens immediately
PooledRef<String> pooledRef = pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1));
assertThat(pooledRef).isNotNull();

//success, acquisition happens after pending some time
pool.acquire(Duration.ofMillis(50)).subscribe();

//error, timed out
pool.acquire(Duration.ofMillis(1))
.as(StepVerifier::create)
.expectError(PoolAcquireTimeoutException.class)
.verify(Duration.ofSeconds(1));

pooledRef.release().block(Duration.ofSeconds(1));

assertThat(recorder.getPendingTotalCount())
.as("total pending")
.isEqualTo(2);
assertThat(recorder.getPendingSuccessCount())
.as("pending success")
.isEqualTo(1)
chemicL marked this conversation as resolved.
Show resolved Hide resolved
.isEqualTo(recorder.getPendingSuccessHistogram().getTotalCount());
assertThat(recorder.getPendingErrorCount())
.as("pending errors")
.isEqualTo(1)
.isEqualTo(recorder.getPendingErrorHistogram().getTotalCount());

long minSuccess = recorder.getPendingSuccessHistogram().getMinValue();
assertThat(minSuccess)
.as("pending success latency")
.isGreaterThanOrEqualTo(1L);

long minError = recorder.getPendingErrorHistogram().getMinValue();
assertThat(minError)
.as("pending error latency")
.isGreaterThanOrEqualTo(1L);
}
}
Loading
Loading