Skip to content

Commit

Permalink
Merge #221 into 1.1.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jun 14, 2024
2 parents 9ea2e15 + 31b0498 commit 3dd8289
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
14 changes: 1 addition & 13 deletions reactor-pool/src/main/java/reactor/pool/AbstractPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,19 +422,7 @@ public void run() {
@Override
public void request(long n) {
if (Operators.validate(n)) {
//start the countdown

boolean noIdle = pool.idleSize() == 0;
boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0;

if (noIdle && noPermits) {
pendingAcquireStart = pool.clock.millis();
if (!pendingAcquireTimeout.isZero()) {
timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
}
}
//doAcquire should interrupt the countdown if there is either an available
//resource or the pool can allocate one
// doAcquire will check for acquire timeout
pool.doAcquire(this);
}
}
Expand Down
15 changes: 13 additions & 2 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -571,7 +571,6 @@ final void maybeRecycleAndDrain(QueuePooledRef<POOLABLE> poolSlot, CoreSubscribe
* @param pending a new {@link reactor.pool.AbstractPool.Borrower} to add to the queue and later either serve or consider pending
*/
void pendingOffer(Borrower<POOLABLE> pending) {
int maxPending = poolConfig.maxPending();
ConcurrentLinkedDeque<Borrower<POOLABLE>> pendingQueue = this.pending;
if (pendingQueue == TERMINATED) {
return;
Expand All @@ -581,7 +580,19 @@ void pendingOffer(Borrower<POOLABLE> pending) {
postOffer = PENDING_SIZE.incrementAndGet(this);
}

int idle = idleSize;
int estimatePermitCount = poolConfig.allocationStrategy().estimatePermitCount();

// This is "best effort"
if (idle + estimatePermitCount < postOffer) {
pending.pendingAcquireStart = clock.millis();
if (!pending.pendingAcquireTimeout.isZero()) {
pending.timeoutTask = config().pendingAcquireTimer().apply(pending, pending.pendingAcquireTimeout);
}
}

if (WIP.getAndIncrement(this) == 0) {
int maxPending = poolConfig.maxPending();
if (maxPending >= 0 && postOffer > maxPending && idleSize == 0 && poolConfig.allocationStrategy().estimatePermitCount() == 0) {
//fail fast. differentiate slightly special case of maxPending == 0
Borrower<POOLABLE> toCull = pendingQueue.pollLast();
Expand Down
32 changes: 32 additions & 0 deletions reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1117,6 +1119,36 @@ void discardCloseableWhenCloseFailureLogs(PoolStyle configAdjuster) {
}
}

@ParameterizedTestWithName
@MethodSource("allPools")
void pendingTimeout(PoolStyle configAdjuster) throws Exception {
PoolBuilder<String, ?> builder = PoolBuilder
.from(Mono.just("pooled"))
.sizeBetween(0, 1)
.maxPendingAcquire(10);
AbstractPool<String> pool = configAdjuster.apply(builder);

CountDownLatch latch = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(20);
try {
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[4];
for (int i = 0; i < completableFutures.length; i++) {
completableFutures[i] = CompletableFuture.runAsync(
() -> pool.acquire(Duration.ofMillis(10))
.doOnError(t -> latch.countDown())
.onErrorResume(PoolAcquireTimeoutException.class, t -> Mono.empty())
.block(),
executorService);
}

CompletableFuture.allOf(completableFutures).join();
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
}
finally {
executorService.shutdown();
}
}

@ParameterizedTestWithName
@MethodSource("allPools")
void pendingTimeoutNotImpactedByLongAllocation(PoolStyle configAdjuster) {
Expand Down

0 comments on commit 3dd8289

Please sign in to comment.