From 01bf4ad9c415f5f239e5ac0a2475fdb517e8131d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Wed, 5 Aug 2020 16:41:19 +0200 Subject: [PATCH 1/2] fix #87 Allow to chose between LRU and MRU for idle resources --- src/main/java/reactor/pool/AbstractPool.java | 2 +- .../java/reactor/pool/DefaultPoolConfig.java | 12 +- src/main/java/reactor/pool/PoolBuilder.java | 70 +- src/main/java/reactor/pool/PoolConfig.java | 11 + .../java/reactor/pool/SimpleDequePool.java | 671 +++++++++++++++ .../java/reactor/pool/SimpleFifoPool.java | 115 --- .../java/reactor/pool/SimpleLifoPool.java | 118 --- src/main/java/reactor/pool/SimplePool.java | 527 ------------ ...lTest.java => AcquireDefaultPoolTest.java} | 168 ++-- .../java/reactor/pool/CommonPoolTest.java | 105 ++- .../pool/PendingAcquireLifoBehaviorTest.java | 263 ++++++ .../java/reactor/pool/SimpleLifoPoolTest.java | 804 ------------------ 12 files changed, 1193 insertions(+), 1673 deletions(-) create mode 100644 src/main/java/reactor/pool/SimpleDequePool.java delete mode 100644 src/main/java/reactor/pool/SimpleFifoPool.java delete mode 100644 src/main/java/reactor/pool/SimpleLifoPool.java delete mode 100644 src/main/java/reactor/pool/SimplePool.java rename src/test/java/reactor/pool/{SimpleFifoPoolTest.java => AcquireDefaultPoolTest.java} (89%) create mode 100644 src/test/java/reactor/pool/PendingAcquireLifoBehaviorTest.java delete mode 100644 src/test/java/reactor/pool/SimpleLifoPoolTest.java diff --git a/src/main/java/reactor/pool/AbstractPool.java b/src/main/java/reactor/pool/AbstractPool.java index c7f8cea2..440d1e77 100644 --- a/src/main/java/reactor/pool/AbstractPool.java +++ b/src/main/java/reactor/pool/AbstractPool.java @@ -106,7 +106,7 @@ public int getMaxPendingAcquireSize() { // == common methods to interact with idle/pending queues == - abstract boolean elementOffer(POOLABLE element); + abstract boolean elementOffer(POOLABLE element); //used in tests /** * Note to implementors: stop the {@link Borrower} countdown by calling diff --git a/src/main/java/reactor/pool/DefaultPoolConfig.java b/src/main/java/reactor/pool/DefaultPoolConfig.java index 42d210ca..327e722d 100644 --- a/src/main/java/reactor/pool/DefaultPoolConfig.java +++ b/src/main/java/reactor/pool/DefaultPoolConfig.java @@ -42,6 +42,7 @@ public class DefaultPoolConfig implements PoolConfig { protected final Scheduler acquisitionScheduler; protected final PoolMetricsRecorder metricsRecorder; protected final Clock clock; + protected final boolean isIdleLRU; public DefaultPoolConfig(Mono allocator, AllocationStrategy allocationStrategy, @@ -51,7 +52,8 @@ public DefaultPoolConfig(Mono allocator, BiPredicate evictionPredicate, Scheduler acquisitionScheduler, PoolMetricsRecorder metricsRecorder, - Clock clock) { + Clock clock, + boolean isIdleLRU) { this.allocator = allocator; this.allocationStrategy = allocationStrategy; this.maxPending = maxPending; @@ -61,6 +63,7 @@ public DefaultPoolConfig(Mono allocator, this.acquisitionScheduler = acquisitionScheduler; this.metricsRecorder = metricsRecorder; this.clock = clock; + this.isIdleLRU = isIdleLRU; } /** @@ -81,6 +84,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.acquisitionScheduler = toCopyDpc.acquisitionScheduler; this.metricsRecorder = toCopyDpc.metricsRecorder; this.clock = toCopyDpc.clock; + this.isIdleLRU = toCopyDpc.isIdleLRU; } else { this.allocator = toCopy.allocator(); @@ -92,6 +96,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.acquisitionScheduler = toCopy.acquisitionScheduler(); this.metricsRecorder = toCopy.metricsRecorder(); this.clock = toCopy.clock(); + this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder(); } } @@ -139,4 +144,9 @@ public PoolMetricsRecorder metricsRecorder() { public Clock clock() { return this.clock; } + + @Override + public boolean reuseIdleResourcesInLruOrder() { + return isIdleLRU; + } } diff --git a/src/main/java/reactor/pool/PoolBuilder.java b/src/main/java/reactor/pool/PoolBuilder.java index 7d204cf5..d9de2e36 100644 --- a/src/main/java/reactor/pool/PoolBuilder.java +++ b/src/main/java/reactor/pool/PoolBuilder.java @@ -70,6 +70,7 @@ public static PoolBuilder> from(Publisher allo Scheduler acquisitionScheduler = Schedulers.immediate(); Clock clock = Clock.systemUTC(); PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE; + boolean idleLruOrder = true; PoolBuilder(Mono allocator, Function, CONF> configModifier) { this.allocator = allocator; @@ -88,6 +89,7 @@ public static PoolBuilder> from(Publisher allo this.acquisitionScheduler = source.acquisitionScheduler; this.metricsRecorder = source.metricsRecorder; this.clock = source.clock; + this.idleLruOrder = source.idleLruOrder; } /** @@ -285,12 +287,52 @@ public PoolBuilder sizeUnbounded() { return allocationStrategy(new AllocationStrategies.UnboundedAllocationStrategy()); } + /** + * Configure the pool so that if there are idle resources (ie pool is under-utilized), + * the next {@link Pool#acquire()} will get the Least Recently Used resource + * (LRU, ie. the resource that was released first among the current idle resources). + * + * @return this {@link Pool} builder + */ + public PoolBuilder idleResourceReuseLruOrder() { + return idleResourceReuseOrder(true); + } + + /** + * Configure the pool so that if there are idle resources (ie pool is under-utilized), + * the next {@link Pool#acquire()} will get the Most Recently Used resource + * (MRU, ie. the resource that was released last among the current idle resources). + * + * @return this {@link Pool} builder + */ + public PoolBuilder idleResourceReuseMruOrder() { + return idleResourceReuseOrder(false); + } + + /** + * Configure the order in which idle resources are used when the next {@link Pool#acquire()} + * is performed (while the pool is under-utilized). Allows to chose between + * the Least Recently Used order when {@code true} (LRU, ie. the resource that + * was released first among the current idle resources, the default) and + * Most Recently Used order (MRU, ie. the resource that was released last among + * the current idle resources). + * + * @param isLru {@code true} for LRU (the default) or {@code false} for MRU + * @return this {@link Pool} builder + * @see #idleResourceReuseLruOrder() + * @see #idleResourceReuseMruOrder() + */ + public PoolBuilder idleResourceReuseOrder(boolean isLru) { + this.idleLruOrder = isLru; + return this; + } + /** * Add implementation-specific configuration, changing the type of {@link PoolConfig} * passed to the {@link Pool} factory in {@link #build(Function)}. * * @param configModifier {@link Function} to transform the type of {@link PoolConfig} - * create by this builder for the benefit of the pool factory, allowing for custom + * created by this builder for the benefit of the pool factory, allowing for custom * implementations with custom configurations * @param new type for the configuration * @return a new PoolBuilder that now produces a different type of {@link PoolConfig} @@ -299,26 +341,41 @@ public > PoolBuilder extraConfiguration(Fu return new PoolBuilder<>(this, this.configModifier.andThen(configModifier)); } + /** + * Construct a default reactor pool with the builder's configuration. + * + * @return an {@link InstrumentedPool} + */ + public InstrumentedPool buildPool() { + return new SimpleDequePool<>(this.buildConfig(), true); + } + /** * Build a LIFO flavor of {@link Pool}, that is to say a flavor where the last * {@link Pool#acquire()} {@link Mono Mono} that was pending is served first * whenever a resource becomes available. + *

+ * This is different from the {@link #idleResourceReuseOrder(boolean) idle resource reuse order}, + * which is used when resources ARE available at the instant the {@link Pool#acquire()} is attempted. * - * @return a builder of {@link Pool} with LIFO pending acquire ordering + * @return a {@link Pool} with LIFO pending acquire ordering */ public InstrumentedPool lifo() { - return build(SimpleLifoPool::new); + return new SimpleDequePool<>(this.buildConfig(), false); } /** * Build the default flavor of {@link Pool}, which has FIFO semantics on pending * {@link Pool#acquire()} {@link Mono Mono}, serving the oldest pending acquire first * whenever a resource becomes available. + *

+ * This is different from the {@link #idleResourceReuseOrder(boolean) idle resource reuse order}, + * which is used when resources ARE available at the instant the {@link Pool#acquire()} is attempted. * - * @return a builder of {@link Pool} with FIFO pending acquire ordering + * @return a {@link Pool} with FIFO pending acquire ordering */ public InstrumentedPool fifo() { - return build(SimpleFifoPool::new); + return buildPool(); } /** @@ -345,7 +402,8 @@ CONF buildConfig() { evictionPredicate, acquisitionScheduler, metricsRecorder, - clock); + clock, + idleLruOrder); return this.configModifier.apply(baseConfig); } diff --git a/src/main/java/reactor/pool/PoolConfig.java b/src/main/java/reactor/pool/PoolConfig.java index 7ddf12ca..3924aa01 100644 --- a/src/main/java/reactor/pool/PoolConfig.java +++ b/src/main/java/reactor/pool/PoolConfig.java @@ -98,4 +98,15 @@ public interface PoolConfig { */ Clock clock(); + /** + * The order in which idle (aka available) resources should be used when the pool was + * under-utilized and a new {@link Pool#acquire()} is performed. Returns {@code true} + * if LRU (Least-Recently Used, the resource that was released first is emitted) or + * {@code false} for MRU (Most-Recently Used, the resource that was released last is + * emitted). + * + * @return {@code true} for LRU, {@code false} for MRU + */ + boolean reuseIdleResourcesInLruOrder(); + } diff --git a/src/main/java/reactor/pool/SimpleDequePool.java b/src/main/java/reactor/pool/SimpleDequePool.java new file mode 100644 index 00000000..4e78a9c3 --- /dev/null +++ b/src/main/java/reactor/pool/SimpleDequePool.java @@ -0,0 +1,671 @@ +/* + * Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.pool; + +import java.time.Duration; +import java.util.Deque; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; + +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.Loggers; +import reactor.util.annotation.Nullable; + +/** + * The {@link SimpleDequePool} is based on {@link Deque} for idle resources and pending {@link Pool#acquire()} Monos, + * allowing both to be ordered either LIFO or FIFO. + * It uses non-blocking drain loops to deliver resources to borrowers, which means that a resource could + * be handed off on any of the following {@link Thread threads}: + *

    + *
  • any thread on which a resource was last allocated
  • + *
  • any thread on which a resource was recently released
  • + *
  • any thread on which an {@link Pool#acquire()} {@link Mono} was subscribed
  • + *
+ * For a more deterministic approach, the {@link PoolBuilder#acquisitionScheduler(Scheduler)} property of the builder can be used. + * + * @author Simon Baslé + */ +public class SimpleDequePool extends AbstractPool { + + @SuppressWarnings("rawtypes") + private static final ConcurrentLinkedDeque TERMINATED = new ConcurrentLinkedDeque(); + + final boolean idleResourceLeastRecentlyUsed; + final boolean pendingBorrowerFirstInFirstServed; + + volatile Deque> idleResources; + @SuppressWarnings("rawtypes") + protected static final AtomicReferenceFieldUpdater IDLE_RESOURCES = + AtomicReferenceFieldUpdater.newUpdater(SimpleDequePool.class, Deque.class, "idleResources"); + + volatile int acquired; + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater ACQUIRED = + AtomicIntegerFieldUpdater.newUpdater(SimpleDequePool.class, "acquired"); + + volatile int wip; + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(SimpleDequePool.class, "wip"); + + volatile ConcurrentLinkedDeque> pending; + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater PENDING = + AtomicReferenceFieldUpdater.newUpdater(SimpleDequePool.class, ConcurrentLinkedDeque.class, "pending"); + + SimpleDequePool(PoolConfig poolConfig, boolean pendingBorrowerFirstInFirstServed) { + super(poolConfig, Loggers.getLogger(SimpleDequePool.class)); + this.idleResourceLeastRecentlyUsed = poolConfig.reuseIdleResourcesInLruOrder(); + this.pendingBorrowerFirstInFirstServed = pendingBorrowerFirstInFirstServed; + this.pending = new ConcurrentLinkedDeque<>(); //unbounded + this.idleResources = new ConcurrentLinkedDeque<>(); + } + + @Override + public Mono> acquire() { + return new QueueBorrowerMono<>(this, + Duration.ZERO); //the mono is unknown to the pool until requested + } + + @Override + public Mono> acquire(Duration timeout) { + return new QueueBorrowerMono<>(this, + timeout); //the mono is unknown to the pool until requested + } + + @Override + public int acquiredSize() { + return acquired; + } + + @Override + public Mono disposeLater() { + return Mono.defer(() -> { + @SuppressWarnings("unchecked") ConcurrentLinkedDeque> q = + PENDING.getAndSet(this, TERMINATED); + if (q != TERMINATED) { + Borrower p; + while ((p = q.pollFirst()) != null) { + p.fail(new PoolShutdownException()); + } + + @SuppressWarnings("unchecked") + Queue> e = + IDLE_RESOURCES.getAndSet(this, null); + if (e != null) { + Mono destroyMonos = Mono.empty(); + while (!e.isEmpty()) { + QueuePooledRef ref = e.poll(); + if (ref.markInvalidate()) { + destroyMonos = destroyMonos.and(destroyPoolable(ref)); + } + } + return destroyMonos; + } + } + return Mono.empty(); + }); + } + + @Override + public int idleSize() { + Queue e = IDLE_RESOURCES.get(this); + return e == null ? 0 : e.size(); + } + + @Override + public Mono warmup() { + if (poolConfig.allocationStrategy() + .permitMinimum() > 0) { + return Mono.defer(() -> { + int initSize = poolConfig.allocationStrategy() + .getPermits(0); + @SuppressWarnings({ "unchecked", "rawtypes" }) //rawtypes added since javac actually complains + Mono[] allWarmups = new Mono[initSize]; + for (int i = 0; i < initSize; i++) { + long start = clock.millis(); + allWarmups[i] = poolConfig.allocator() + .doOnNext(p -> { + metricsRecorder.recordAllocationSuccessAndLatency( + clock.millis() - start); + //the pool slot won't access this pool instance until after it has been constructed + this.idleResources.offerLast(createSlot(p)); + }) + .doOnError(e -> { + metricsRecorder.recordAllocationFailureAndLatency( + clock.millis() - start); + poolConfig.allocationStrategy() + .returnPermits(1); + }); + } + return Flux.concat(allWarmups) + .reduce(0, (count, p) -> count + 1); + }); + } + else { + return Mono.just(0); + } + } + + @Override + void cancelAcquire(Borrower borrower) { + if (!isDisposed()) { //ignore pool disposed + ConcurrentLinkedDeque> q = this.pending; + if (q.remove(borrower)) { + PENDING_COUNT.decrementAndGet(this); + } + } + } + + QueuePooledRef createSlot(POOLABLE element) { + return new QueuePooledRef<>(this, element); + } + + @Override + void doAcquire(Borrower borrower) { + if (isDisposed()) { + borrower.fail(new PoolShutdownException()); + return; + } + + pendingOffer(borrower); + drain(); + } + + void drain() { + if (WIP.getAndIncrement(this) == 0) { + drainLoop(); + } + } + + private void drainLoop() { + for (; ; ) { + @SuppressWarnings("unchecked") + Deque> irq = IDLE_RESOURCES.get(this); + if (irq != null) { + int availableCount = irq.size(); + int pendingCount = PENDING_COUNT.get(this); + int estimatedPermitCount = poolConfig.allocationStrategy() + .estimatePermitCount(); + + if (availableCount == 0) { + if (pendingCount > 0 && estimatedPermitCount > 0) { + final Borrower borrower = pendingPoll(); //shouldn't be null + if (borrower == null) { + continue; + } + ACQUIRED.incrementAndGet(this); + int permits = poolConfig.allocationStrategy() + .getPermits(1); + if (borrower.get() || permits == 0) { + ACQUIRED.decrementAndGet(this); + continue; + } + borrower.stopPendingCountdown(); + long start = clock.millis(); + Mono allocator; + Scheduler s = poolConfig.acquisitionScheduler(); + if (s != Schedulers.immediate()) { + allocator = poolConfig.allocator() + .publishOn(s); + } + else { + allocator = poolConfig.allocator(); + } + Mono primary = allocator.doOnEach(sig -> { + if (sig.isOnNext()) { + POOLABLE newInstance = sig.get(); + metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); + borrower.deliver(createSlot(newInstance)); + } + else if (sig.isOnError()) { + metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start); + ACQUIRED.decrementAndGet(this); + poolConfig.allocationStrategy() + .returnPermits(1); + borrower.fail(sig.getThrowable()); + drain(); + } + }); + + int toWarmup = permits - 1; + if (toWarmup < 1) { + primary.subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> { }); + } + else { + logger.debug("should warm up {} extra resources", toWarmup); + final long startWarmupIteration = clock.millis(); + final Mono warmup = Flux + .range(1, toWarmup) + .flatMap(i -> allocator + .doOnNext( + poolable -> { + logger.debug("warmed up extra resource {}/{}", i, toWarmup); + metricsRecorder.recordAllocationSuccessAndLatency( + clock.millis() - startWarmupIteration); + irq.offer(new QueuePooledRef<>(this, poolable)); + drain(); + }) + .onErrorResume( + warmupError -> { + logger.debug("failed to warm up extra resource {}/{}: {}", i, toWarmup, + warmupError.toString()); + metricsRecorder.recordAllocationFailureAndLatency( + clock.millis() - startWarmupIteration); + //we return permits in case of warmup failure, but shouldn't further decrement ACQUIRED + poolConfig.allocationStrategy().returnPermits(1); + drain(); + return Mono.empty(); + })) + .then(); + + //we ignore errors from primary (already propagated), which allows us to attempt + // the warmup in all cases. individual warmup failures decrement the permit and are logged + primary.onErrorResume(ignore -> Mono.empty()) + .thenMany(warmup) + //all errors and values are either propagated or logged, nothing to do + .subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> { }); + } + } + } + else if (pendingCount > 0) { + if (isDisposed()) { + continue; + } + //there are objects ready and unclaimed in the pool + a pending + QueuePooledRef slot = idleResourceLeastRecentlyUsed ? irq.pollFirst() : irq.pollLast(); + if (slot == null) { + continue; + } + + if (poolConfig.evictionPredicate() + .test(slot.poolable, slot) && slot.markInvalidate()) { + destroyPoolable(slot).subscribe(null, + error -> drain(), + this::drain); + continue; + } + + //there is a party currently pending acquiring + Borrower inner = pendingPoll(); + if (inner == null) { + if (!isDisposed()) { + //put back at the same end + if (idleResourceLeastRecentlyUsed) { + irq.offerFirst(slot); + } + else { + irq.offerLast(slot); + } + } + continue; + } + inner.stopPendingCountdown(); + ACQUIRED.incrementAndGet(this); + poolConfig.acquisitionScheduler() + .schedule(() -> inner.deliver(slot)); + } + } + + if (WIP.decrementAndGet(this) == 0) { + break; + } + } + } + + @Override + boolean elementOffer(POOLABLE element) { + @SuppressWarnings("unchecked") + Deque> irq = IDLE_RESOURCES.get(this); + if (irq == null) { + return false; + } + return irq.offerLast(createSlot(element)); + } + + @SuppressWarnings("WeakerAccess") + final void maybeRecycleAndDrain(QueuePooledRef poolSlot) { + if (!isDisposed()) { + if (!poolConfig.evictionPredicate() + .test(poolSlot.poolable, poolSlot)) { + metricsRecorder.recordRecycled(); + @SuppressWarnings("unchecked") + Deque> irq = IDLE_RESOURCES.get(this); + if (irq != null) { + QueuePooledRef slot = recycleSlot(poolSlot); + irq.offerLast(slot); + drain(); + if (isDisposed() && slot.markInvalidate()) { + destroyPoolable(slot).subscribe(); //TODO manage errors? + } + return; + } + } + } + if (poolSlot.markInvalidate()) { + destroyPoolable(poolSlot).subscribe(null, + e -> drain(), + this::drain); //TODO manage errors? + } + } + + /** + * @param pending a new {@link reactor.pool.AbstractPool.Borrower} to register as pending + * + * @return true if the pool had capacity to register this new pending + */ + boolean pendingOffer(Borrower pending) { + int maxPending = poolConfig.maxPending(); + for (; ; ) { + int currentPending = PENDING_COUNT.get(this); + if (maxPending >= 0 && currentPending == maxPending) { + pending.fail(new PoolAcquirePendingLimitException(maxPending)); + return false; + } + else if (PENDING_COUNT.compareAndSet(this, + currentPending, + currentPending + 1)) { + this.pending.offerLast(pending); //unbounded + return true; + } + } + } + + /** + * @return the next {@link reactor.pool.AbstractPool.Borrower} to serve + */ + @Nullable + Borrower pendingPoll() { + Deque> pq = pending; + Borrower b = this.pendingBorrowerFirstInFirstServed ? + pq.pollFirst() : + pq.pollLast(); + if (b != null) { + PENDING_COUNT.decrementAndGet(this); + } + return b; + } + + QueuePooledRef recycleSlot( + QueuePooledRef slot) { + return new QueuePooledRef<>(slot); + } + + @Override + public boolean isDisposed() { + return PENDING.get(this) == TERMINATED; + } + + + + + static final class QueuePooledRef extends AbstractPooledRef { + + final SimpleDequePool pool; + + QueuePooledRef(SimpleDequePool pool, T poolable) { + super(poolable, pool.metricsRecorder, pool.clock); + this.pool = pool; + } + + QueuePooledRef(QueuePooledRef oldRef) { + super(oldRef); + this.pool = oldRef.pool; + } + + @Override + public Mono invalidate() { + return Mono.defer(() -> { + if (markInvalidate()) { + //immediately clean up state + ACQUIRED.decrementAndGet(pool); + return pool.destroyPoolable(this) + .then(Mono.fromRunnable(pool::drain)); + } + else { + return Mono.empty(); + } + }); + } + + @Override + public Mono release() { + return Mono.defer(() -> { + if (STATE.get(this) == STATE_RELEASED) { + return Mono.empty(); + } + + if (pool.isDisposed()) { + ACQUIRED.decrementAndGet(pool); //immediately clean up state + if (markInvalidate()) { + return pool.destroyPoolable(this); + } + else { + return Mono.empty(); + } + } + + Publisher cleaner; + try { + cleaner = pool.poolConfig.releaseHandler() + .apply(poolable); + } + catch (Throwable e) { + ACQUIRED.decrementAndGet(pool); //immediately clean up state + markReleased(); + return Mono.error(new IllegalStateException( + "Couldn't apply cleaner function", + e)); + } + //the PoolRecyclerMono will wrap the cleaning Mono returned by the Function and perform state updates + return new QueuePoolRecyclerMono<>(cleaner, this); + }); + } + } + + + + static final class QueueBorrowerMono extends Mono> { + + final SimpleDequePool parent; + final Duration acquireTimeout; + + QueueBorrowerMono(SimpleDequePool pool, Duration acquireTimeout) { + this.parent = pool; + this.acquireTimeout = acquireTimeout; + } + + @Override + public void subscribe(CoreSubscriber> actual) { + Objects.requireNonNull(actual, "subscribing with null"); + Borrower borrower = new Borrower<>(actual, parent, acquireTimeout); + actual.onSubscribe(borrower); + } + } + + private static final class QueuePoolRecyclerInner + implements CoreSubscriber, Scannable, Subscription { + + final CoreSubscriber actual; + final SimpleDequePool pool; + + //poolable can be checked for null to protect against protocol errors + QueuePooledRef pooledRef; + Subscription upstream; + long start; + + //once protects against multiple requests + volatile int once; + QueuePoolRecyclerInner(CoreSubscriber actual, + QueuePooledRef pooledRef) { + this.actual = actual; + this.pooledRef = Objects.requireNonNull(pooledRef, "pooledRef"); + this.pool = pooledRef.pool; + } + + @Override + public void cancel() { + //NO-OP, once requested, release cannot be cancelled + } + + @Override + public void onComplete() { + QueuePooledRef slot = pooledRef; + pooledRef = null; + if (slot == null) { + return; + } + + //some operators might immediately produce without request (eg. fromRunnable) + // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user + if (ONCE.compareAndSet(this, 0, 1)) { + ACQUIRED.decrementAndGet(pool); + } + + pool.metricsRecorder.recordResetLatency(pool.clock.millis() - start); + + pool.maybeRecycleAndDrain(slot); + actual.onComplete(); + } + + @Override + public void onError(Throwable throwable) { + QueuePooledRef slot = pooledRef; + pooledRef = null; + if (slot == null) { + Operators.onErrorDropped(throwable, actual.currentContext()); + return; + } + + //some operators might immediately produce without request (eg. fromRunnable) + // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user + if (ONCE.compareAndSet(this, 0, 1)) { + ACQUIRED.decrementAndGet(pool); + } + + //TODO should we separate reset errors? + pool.metricsRecorder.recordResetLatency(pool.clock.millis() - start); + + if (slot.markInvalidate()) { + pool.destroyPoolable(slot) + .subscribe(null, null, pool::drain); //TODO manage errors? + } + + actual.onError(throwable); + } + + @Override + public void onNext(Void o) { + //N/A + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(upstream, s)) { + this.upstream = s; + actual.onSubscribe(this); + this.start = pool.clock.millis(); + } + } + + @Override + public void request(long l) { + if (Operators.validate(l)) { + upstream.request(l); + // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user + if (ONCE.compareAndSet(this, 0, 1)) { + ACQUIRED.decrementAndGet(pool); + } + } + } + + @Override + @SuppressWarnings("rawtypes") + public Object scanUnsafe(Scannable.Attr key) { + if (key == Attr.ACTUAL) { + return actual; + } + if (key == Attr.PARENT) { + return upstream; + } + if (key == Attr.CANCELLED) { + return false; + } + if (key == Attr.TERMINATED) { + return pooledRef == null; + } + if (key == Attr.BUFFERED) { + return (pooledRef == null) ? 0 : 1; + } + return null; + } + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(QueuePoolRecyclerInner.class, "once"); + } + + private static final class QueuePoolRecyclerMono extends Mono + implements Scannable { + + final Publisher source; + final AtomicReference> slotRef; + + QueuePoolRecyclerMono(Publisher source, QueuePooledRef poolSlot) { + this.source = source; + this.slotRef = new AtomicReference<>(poolSlot); + } + + @Override + @Nullable + @SuppressWarnings("rawtypes") + public Object scanUnsafe(Attr key) { + if (key == Attr.PREFETCH) { + return Integer.MAX_VALUE; + } + if (key == Attr.PARENT) { + return source; + } + return null; + } + + @Override + public void subscribe(CoreSubscriber actual) { + QueuePooledRef slot = slotRef.getAndSet(null); + if (slot == null || !slot.markReleased()) { + Operators.complete(actual); + } + else { + QueuePoolRecyclerInner qpr = + new QueuePoolRecyclerInner<>(actual, slot); + source.subscribe(qpr); + } + } + } +} diff --git a/src/main/java/reactor/pool/SimpleFifoPool.java b/src/main/java/reactor/pool/SimpleFifoPool.java deleted file mode 100644 index 34a758e6..00000000 --- a/src/main/java/reactor/pool/SimpleFifoPool.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.pool; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import reactor.core.publisher.Mono; -import reactor.util.concurrent.Queues; - -/** - * This implementation is based on MPMC queues for both idle resources and pending {@link Pool#acquire()} Monos, - * resulting in serving pending borrowers in FIFO order. - * - * See {@link SimplePool} for other characteristics of the simple pool. - * - * @author Simon Baslé - */ -final class SimpleFifoPool extends SimplePool { - - @SuppressWarnings("rawtypes") - private static final Queue TERMINATED = Queues.empty().get(); - - volatile Queue> pending; - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater PENDING = AtomicReferenceFieldUpdater.newUpdater( - SimpleFifoPool.class, Queue.class, "pending"); - - public SimpleFifoPool(PoolConfig poolConfig) { - super(poolConfig); - this.pending = new ConcurrentLinkedQueue<>(); //unbounded MPMC - } - - @Override - boolean pendingOffer(Borrower pending) { - int maxPending = poolConfig.maxPending(); - for (;;) { - int currentPending = PENDING_COUNT.get(this); - if (maxPending >= 0 && currentPending == maxPending) { - pending.fail(new PoolAcquirePendingLimitException(maxPending)); - return false; - } - else if (PENDING_COUNT.compareAndSet(this, currentPending, currentPending + 1)) { - this.pending.offer(pending); //unbounded - return true; - } - } - } - - @Override - Borrower pendingPoll() { - Queue> q = this.pending; - Borrower b = q.poll(); - if (b != null) PENDING_COUNT.decrementAndGet(this); - return b; - } - - @Override - void cancelAcquire(Borrower borrower) { - if (!isDisposed()) { //ignore pool disposed - Queue> q = this.pending; - if (q.remove(borrower)) { - PENDING_COUNT.decrementAndGet(this); - } - } - } - - @Override - public Mono disposeLater() { - return Mono.defer(() -> { - @SuppressWarnings("unchecked") - Queue> q = PENDING.getAndSet(this, TERMINATED); - if (q != TERMINATED) { - Borrower nextPending; - while((nextPending = q.poll()) != null) { - nextPending.fail(new PoolShutdownException()); - } - - @SuppressWarnings("unchecked") - Queue> e = ELEMENTS.getAndSet(this, null); - if (e != null) { - Mono destroyMonos = Mono.empty(); - while (!e.isEmpty()) { - QueuePooledRef ref = e.poll(); - if (ref.markInvalidate()) { - destroyMonos = destroyMonos.and(destroyPoolable(ref)); - } - } - return destroyMonos; - } - } - return Mono.empty(); - }); - } - - @Override - public boolean isDisposed() { - return PENDING.get(this) == TERMINATED; - } - -} diff --git a/src/main/java/reactor/pool/SimpleLifoPool.java b/src/main/java/reactor/pool/SimpleLifoPool.java deleted file mode 100644 index 0c1fa24a..00000000 --- a/src/main/java/reactor/pool/SimpleLifoPool.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.pool; - -import java.util.Deque; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import reactor.core.publisher.Mono; - -/** - * This implementation is based on {@link java.util.concurrent.ConcurrentLinkedQueue} MPMC queue - * for idle resources and a {@link ConcurrentLinkedDeque} for pending {@link Pool#acquire()} - * Monos, used as a stack ({@link java.util.Deque#offerFirst(Object)}, - * {@link Deque#pollFirst()}. This results in serving pending borrowers in LIFO order. - * - * See {@link SimplePool} for other characteristics of the simple pool. - * - * @author Simon Baslé - */ -final class SimpleLifoPool extends SimplePool { - - @SuppressWarnings("rawtypes") - private static final ConcurrentLinkedDeque TERMINATED = new ConcurrentLinkedDeque(); - - volatile ConcurrentLinkedDeque> pending; - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater PENDING = AtomicReferenceFieldUpdater.newUpdater( - SimpleLifoPool.class, ConcurrentLinkedDeque.class, "pending"); - - public SimpleLifoPool(PoolConfig poolConfig) { - super(poolConfig); - this.pending = new ConcurrentLinkedDeque<>(); //unbounded - } - - @Override - boolean pendingOffer(Borrower pending) { - int maxPending = poolConfig.maxPending(); - for (;;) { - int currentPending = PENDING_COUNT.get(this); - if (maxPending >= 0 && currentPending == maxPending) { - pending.fail(new PoolAcquirePendingLimitException(maxPending)); - return false; - } - else if (PENDING_COUNT.compareAndSet(this, currentPending, currentPending + 1)) { - this.pending.offerFirst(pending); //unbounded - return true; - } - } - } - - @Override - Borrower pendingPoll() { - ConcurrentLinkedDeque> q = this.pending; - Borrower b = q.pollFirst(); - if (b != null) PENDING_COUNT.decrementAndGet(this); - return b; - } - - @Override - void cancelAcquire(Borrower borrower) { - if (!isDisposed()) { //ignore pool disposed - ConcurrentLinkedDeque> q = this.pending; - if (q.remove(borrower)) { - PENDING_COUNT.decrementAndGet(this); - } - } - } - - @Override - public Mono disposeLater() { - return Mono.defer(() -> { - @SuppressWarnings("unchecked") - ConcurrentLinkedDeque> q = PENDING.getAndSet(this, TERMINATED); - if (q != TERMINATED) { - Borrower p; - while((p = q.pollFirst()) != null) { - p.fail(new PoolShutdownException()); - } - - @SuppressWarnings("unchecked") - Queue> e = ELEMENTS.getAndSet(this, null); - if (e != null) { - Mono destroyMonos = Mono.empty(); - while (!e.isEmpty()) { - QueuePooledRef ref = e.poll(); - if (ref.markInvalidate()) { - destroyMonos = destroyMonos.and(destroyPoolable(ref)); - } - } - return destroyMonos; - } - } - return Mono.empty(); - }); - } - - @Override - public boolean isDisposed() { - return PENDING.get(this) == TERMINATED; - } - -} diff --git a/src/main/java/reactor/pool/SimplePool.java b/src/main/java/reactor/pool/SimplePool.java deleted file mode 100644 index 0c9edcba..00000000 --- a/src/main/java/reactor/pool/SimplePool.java +++ /dev/null @@ -1,527 +0,0 @@ -/* - * Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.pool; - -import java.time.Duration; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscription; - -import reactor.core.CoreSubscriber; -import reactor.core.Scannable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Operators; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; -import reactor.util.Loggers; -import reactor.util.annotation.Nullable; -import reactor.util.concurrent.Queues; - -/** - * The {@link SimplePool} is based on queues for idle resources and FIFO or LIFO data structures for - * pending {@link Pool#acquire()} Monos. - * It uses non-blocking drain loops to deliver resources to borrowers, which means that a resource could - * be handed off on any of the following {@link Thread threads}: - *
    - *
  • any thread on which a resource was last allocated
  • - *
  • any thread on which a resource was recently released
  • - *
  • any thread on which an {@link Pool#acquire()} {@link Mono} was subscribed
  • - *
- * For a more deterministic approach, the {@link PoolBuilder#acquisitionScheduler(Scheduler)} property of the builder can be used. - * - * @author Simon Baslé - */ -abstract class SimplePool extends AbstractPool { - - volatile Queue> elements; - @SuppressWarnings("rawtypes") - protected static final AtomicReferenceFieldUpdater ELEMENTS = AtomicReferenceFieldUpdater.newUpdater( - SimplePool.class, Queue.class, "elements"); - - volatile int acquired; - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater ACQUIRED = AtomicIntegerFieldUpdater.newUpdater( - SimplePool.class, "acquired"); - - volatile int wip; - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater( - SimplePool.class, "wip"); - - - SimplePool(PoolConfig poolConfig) { - super(poolConfig, Loggers.getLogger(SimplePool.class)); - this.elements = Queues.>unboundedMultiproducer().get(); - } - - @Override - public Mono warmup() { - if (poolConfig.allocationStrategy().permitMinimum() > 0) { - return Mono.defer(() -> { - int initSize = poolConfig.allocationStrategy().getPermits(0); - @SuppressWarnings({"unchecked", "rawtypes"}) - Mono[] allWarmups = new Mono[initSize]; - for (int i = 0; i < initSize; i++) { - long start = clock.millis(); - allWarmups[i] = poolConfig - .allocator() - .doOnNext(p -> { - metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); - //the pool slot won't access this pool instance until after it has been constructed - this.elements.offer(createSlot(p)); - }) - .doOnError(e -> { - metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start); - poolConfig.allocationStrategy().returnPermits(1); - }); - } - return Flux.concat(allWarmups) - .reduce(0, (count, p) -> count + 1); - }); - } - else { - return Mono.just(0); - } - } - - /** - * @return the next {@link reactor.pool.AbstractPool.Borrower} to serve - */ - @Nullable - abstract Borrower pendingPoll(); - - /** - * @param pending a new {@link reactor.pool.AbstractPool.Borrower} to register as pending - * @return true if the pool had capacity to register this new pending - */ - abstract boolean pendingOffer(Borrower pending); - - @Override - public Mono> acquire() { - return new QueueBorrowerMono<>(this, Duration.ZERO); //the mono is unknown to the pool until requested - } - - @Override - public Mono> acquire(Duration timeout) { - return new QueueBorrowerMono<>(this, timeout); //the mono is unknown to the pool until requested - } - - @Override - void doAcquire(Borrower borrower) { - if (isDisposed()) { - borrower.fail(new PoolShutdownException()); - return; - } - - pendingOffer(borrower); - drain(); - } - - @Override - boolean elementOffer(POOLABLE element) { - @SuppressWarnings("unchecked") - Queue> e = ELEMENTS.get(this); - if (e == null) { - return false; - } - return e.offer(createSlot(element)); - } - - QueuePooledRef createSlot(POOLABLE element) { - return new QueuePooledRef<>(this, element); - } - - QueuePooledRef recycleSlot(QueuePooledRef slot) { - return new QueuePooledRef<>(slot); - } - - @Override - public int idleSize() { - Queue e = ELEMENTS.get(this); - return e == null ? 0 : e.size(); - } - - @SuppressWarnings("WeakerAccess") - final void maybeRecycleAndDrain(QueuePooledRef poolSlot) { - if (!isDisposed()) { - if (!poolConfig.evictionPredicate().test(poolSlot.poolable, poolSlot)) { - metricsRecorder.recordRecycled(); - @SuppressWarnings("unchecked") - Queue> e = ELEMENTS.get(this); - if (e != null) { - QueuePooledRef slot = recycleSlot(poolSlot); - e.offer(slot); - drain(); - if (isDisposed() && slot.markInvalidate()) { - destroyPoolable(slot).subscribe(); //TODO manage errors? - } - return; - } - } - } - if (poolSlot.markInvalidate()) { - destroyPoolable(poolSlot).subscribe(null, e -> drain(), this::drain); //TODO manage errors? - } - } - - void drain() { - if (WIP.getAndIncrement(this) == 0) { - drainLoop(); - } - } - - private void drainLoop() { - for (;;) { - @SuppressWarnings("unchecked") - Queue> e = ELEMENTS.get(this); - if (e != null) { - int availableCount = e.size(); - int pendingCount = PENDING_COUNT.get(this); - int estimatedPermitCount = poolConfig.allocationStrategy().estimatePermitCount(); - - if (availableCount == 0) { - if (pendingCount > 0 && estimatedPermitCount > 0) { - final Borrower borrower = pendingPoll(); //shouldn't be null - if (borrower == null) { - continue; - } - ACQUIRED.incrementAndGet(this); - int permits = poolConfig.allocationStrategy().getPermits(1); - if (borrower.get() || permits == 0) { - ACQUIRED.decrementAndGet(this); - continue; - } - borrower.stopPendingCountdown(); - long start = clock.millis(); - Mono allocator; - Scheduler s = poolConfig.acquisitionScheduler(); - if (s != Schedulers.immediate()) { - allocator = poolConfig.allocator().publishOn(s); - } - else { - allocator = poolConfig.allocator(); - } - Mono primary = allocator.doOnEach(sig -> { - if (sig.isOnNext()) { - POOLABLE newInstance = sig.get(); - metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); - borrower.deliver(createSlot(newInstance)); - } - else if (sig.isOnError()) { - metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start); - ACQUIRED.decrementAndGet(this); - poolConfig.allocationStrategy() - .returnPermits(1); - borrower.fail(sig.getThrowable()); - drain(); - } - }); - - int toWarmup = permits - 1; - if (toWarmup < 1) { - primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> {}); - } - else { - logger.debug("should warm up {} extra resources", toWarmup); - final long startWarmupIteration = clock.millis(); - final Mono warmup = Flux - .range(1, toWarmup) - .flatMap(i -> allocator - .doOnNext(poolable -> { - logger.debug("warmed up extra resource {}/{}", i, toWarmup); - metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - startWarmupIteration); - e.offer(new QueuePooledRef<>(this, poolable)); - drain(); - }) - .onErrorResume(warmupError -> { - logger.debug("failed to warm up extra resource {}/{}: {}", i, toWarmup, warmupError.toString()); - metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - startWarmupIteration); - //we return permits in case of warmup failure, but shouldn't further decrement ACQUIRED - poolConfig.allocationStrategy().returnPermits(1); - drain(); - return Mono.empty(); - }) - ) - .then(); - - //we ignore errors from primary (already propagated), which allows us to attempt - // the warmup in all cases. individual warmup failures decrement the permit and are logged - primary.onErrorResume(ignore -> Mono.empty()) - .thenMany(warmup) - //all errors and values are either propagated or logged, nothing to do - .subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> {}); - } - } - } - else if (pendingCount > 0) { - if (isDisposed()) { - continue; - } - //there are objects ready and unclaimed in the pool + a pending - QueuePooledRef slot = e.poll(); - if (slot == null) continue; - - if (poolConfig.evictionPredicate().test(slot.poolable, slot) && slot.markInvalidate()) { - destroyPoolable(slot).subscribe(null, error -> drain(), this::drain); - continue; - } - - //there is a party currently pending acquiring - Borrower inner = pendingPoll(); - if (inner == null) { - if (!isDisposed()) { - e.offer(slot); - } - continue; - } - inner.stopPendingCountdown(); - ACQUIRED.incrementAndGet(this); - poolConfig.acquisitionScheduler().schedule(() -> inner.deliver(slot)); - } - } - - if (WIP.decrementAndGet(this) == 0) { - break; - } - } - } - - static final class QueuePooledRef extends AbstractPooledRef { - - final SimplePool pool; - - QueuePooledRef(SimplePool pool, T poolable) { - super(poolable, pool.metricsRecorder, pool.clock); - this.pool = pool; - } - - QueuePooledRef(QueuePooledRef oldRef) { - super(oldRef); - this.pool = oldRef.pool; - } - - @Override - public Mono release() { - return Mono.defer(() -> { - if (STATE.get(this) == STATE_RELEASED) { - return Mono.empty(); - } - - if (pool.isDisposed()) { - ACQUIRED.decrementAndGet(pool); //immediately clean up state - if (markInvalidate()) { - return pool.destroyPoolable(this); - } - else { - return Mono.empty(); - } - } - - Publisher cleaner; - try { - cleaner = pool.poolConfig.releaseHandler().apply(poolable); - } - catch (Throwable e) { - ACQUIRED.decrementAndGet(pool); //immediately clean up state - markReleased(); - return Mono.error(new IllegalStateException("Couldn't apply cleaner function", e)); - } - //the PoolRecyclerMono will wrap the cleaning Mono returned by the Function and perform state updates - return new QueuePoolRecyclerMono<>(cleaner, this); - }); - } - - @Override - public Mono invalidate() { - return Mono.defer(() -> { - if (markInvalidate()) { - //immediately clean up state - ACQUIRED.decrementAndGet(pool); - return pool.destroyPoolable(this).then(Mono.fromRunnable(pool::drain)); - } - else { - return Mono.empty(); - } - }); - } - } - - static final class QueueBorrowerMono extends Mono> { - - final SimplePool parent; - final Duration acquireTimeout; - - QueueBorrowerMono(SimplePool pool, Duration acquireTimeout) { - this.parent = pool; - this.acquireTimeout = acquireTimeout; - } - - @Override - public void subscribe(CoreSubscriber> actual) { - Objects.requireNonNull(actual, "subscribing with null"); - Borrower borrower = new Borrower<>(actual, parent, acquireTimeout); - actual.onSubscribe(borrower); - } - } - - private static final class QueuePoolRecyclerInner implements CoreSubscriber, Scannable, Subscription { - - final CoreSubscriber actual; - final SimplePool pool; - - //poolable can be checked for null to protect against protocol errors - QueuePooledRef pooledRef; - Subscription upstream; - long start; - - //once protects against multiple requests - volatile int once; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(QueuePoolRecyclerInner.class, "once"); - - QueuePoolRecyclerInner(CoreSubscriber actual, QueuePooledRef pooledRef) { - this.actual = actual; - this.pooledRef = Objects.requireNonNull(pooledRef, "pooledRef"); - this.pool = pooledRef.pool; - } - - @Override - public void onSubscribe(Subscription s) { - if (Operators.validate(upstream, s)) { - this.upstream = s; - actual.onSubscribe(this); - this.start = pool.clock.millis(); - } - } - - @Override - public void onNext(Void o) { - //N/A - } - - @Override - public void onError(Throwable throwable) { - QueuePooledRef slot = pooledRef; - pooledRef = null; - if (slot == null) { - Operators.onErrorDropped(throwable, actual.currentContext()); - return; - } - - //some operators might immediately produce without request (eg. fromRunnable) - // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user - if (ONCE.compareAndSet(this, 0, 1)) { - ACQUIRED.decrementAndGet(pool); - } - - //TODO should we separate reset errors? - pool.metricsRecorder.recordResetLatency(pool.clock.millis() - start); - - if (slot.markInvalidate()) { - pool.destroyPoolable(slot).subscribe(null, null, pool::drain); //TODO manage errors? - } - - actual.onError(throwable); - } - - @Override - public void onComplete() { - QueuePooledRef slot = pooledRef; - pooledRef = null; - if (slot == null) { - return; - } - - //some operators might immediately produce without request (eg. fromRunnable) - // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user - if (ONCE.compareAndSet(this, 0, 1)) { - ACQUIRED.decrementAndGet(pool); - } - - pool.metricsRecorder.recordResetLatency(pool.clock.millis() - start); - - pool.maybeRecycleAndDrain(slot); - actual.onComplete(); - } - - @Override - public void request(long l) { - if (Operators.validate(l)) { - upstream.request(l); - // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user - if (ONCE.compareAndSet(this, 0, 1)) { - ACQUIRED.decrementAndGet(pool); - } - } - } - - @Override - public void cancel() { - //NO-OP, once requested, release cannot be cancelled - } - - - @Override - @SuppressWarnings("rawtypes") - public Object scanUnsafe(Attr key) { - if (key == Attr.ACTUAL) return actual; - if (key == Attr.PARENT) return upstream; - if (key == Attr.CANCELLED) return false; - if (key == Attr.TERMINATED) return pooledRef == null; - if (key == Attr.BUFFERED) return (pooledRef == null) ? 0 : 1; - return null; - } - } - - private static final class QueuePoolRecyclerMono extends Mono implements Scannable { - - final Publisher source; - final AtomicReference> slotRef; - - QueuePoolRecyclerMono(Publisher source, QueuePooledRef poolSlot) { - this.source = source; - this.slotRef = new AtomicReference<>(poolSlot); - } - - @Override - public void subscribe(CoreSubscriber actual) { - QueuePooledRef slot = slotRef.getAndSet(null); - if (slot == null || !slot.markReleased()) { - Operators.complete(actual); - } - else { - QueuePoolRecyclerInner qpr = new QueuePoolRecyclerInner<>(actual, slot); - source.subscribe(qpr); - } - } - - @Override - @Nullable - @SuppressWarnings("rawtypes") - public Object scanUnsafe(Attr key) { - if (key == Attr.PREFETCH) return Integer.MAX_VALUE; - if (key == Attr.PARENT) return source; - return null; - } - } - -} diff --git a/src/test/java/reactor/pool/SimpleFifoPoolTest.java b/src/test/java/reactor/pool/AcquireDefaultPoolTest.java similarity index 89% rename from src/test/java/reactor/pool/SimpleFifoPoolTest.java rename to src/test/java/reactor/pool/AcquireDefaultPoolTest.java index 825066fc..e47e9a70 100644 --- a/src/test/java/reactor/pool/SimpleFifoPoolTest.java +++ b/src/test/java/reactor/pool/AcquireDefaultPoolTest.java @@ -53,9 +53,14 @@ import static reactor.pool.PoolBuilder.from; /** + * This test class uses both modes of acquire on the {@link PoolBuilder#buildPool()} default + * pool. + * * @author Simon Baslé */ -class SimpleFifoPoolTest { +//TODO merge with CommonPoolTest ? +//TODO ensure correct cleanup of executors and schedulers in all tests +class AcquireDefaultPoolTest { private Disposable.Composite disposeList; @@ -75,25 +80,22 @@ T autoDispose(T toDispose) { } //==utils for package-private config== - static final PoolConfig poolableTestConfig(int minSize, int maxSize, Mono allocator) { + static final PoolBuilder> poolableTestBuilder(int minSize, int maxSize, Mono allocator) { return from(allocator) .sizeBetween(minSize, maxSize) .releaseHandler(pt -> Mono.fromRunnable(pt::clean)) - .evictionPredicate((value, metadata) -> !value.isHealthy()) - .buildConfig(); + .evictionPredicate((value, metadata) -> !value.isHealthy()); } - static final PoolConfig poolableTestConfig(int minSize, int maxSize, Mono allocator, Scheduler deliveryScheduler) { + static final PoolBuilder> poolableTestBuilder(int minSize, int maxSize, Mono allocator, Scheduler deliveryScheduler) { return from(allocator) .sizeBetween(minSize, maxSize) - .sizeBetween(0, maxSize) .releaseHandler(pt -> Mono.fromRunnable(pt::clean)) .evictionPredicate((value, metadata) -> !value.isHealthy()) - .acquisitionScheduler(deliveryScheduler) - .buildConfig(); + .acquisitionScheduler(deliveryScheduler); } - static final PoolConfig poolableTestConfig(int minSize, int maxSize, Mono allocator, + static final PoolBuilder> poolableTestBuilder(int minSize, int maxSize, Mono allocator, Consumer additionalCleaner) { return from(allocator) .sizeBetween(minSize, maxSize) @@ -101,8 +103,7 @@ static final PoolConfig poolableTestConfig(int minSize, int maxSiz poolableTest.clean(); additionalCleaner.accept(poolableTest); })) - .evictionPredicate((value, metadata) -> !value.isHealthy()) - .buildConfig(); + .evictionPredicate((value, metadata) -> !value.isHealthy()); } //====== @@ -111,11 +112,11 @@ void demonstrateAcquireInScopePipeline() throws InterruptedException { AtomicInteger counter = new AtomicInteger(); AtomicReference releaseRef = new AtomicReference<>(); - SimpleFifoPool pool = new SimpleFifoPool<>( + InstrumentedPool pool = from(Mono.just("Hello Reactive World")) .sizeBetween(0, 1) .releaseHandler(s -> Mono.fromRunnable(()-> releaseRef.set(s))) - .buildConfig()); + .buildPool(); Flux words = pool.withPoolable(poolable -> Mono.just(poolable) //simulate deriving a value from the resource (ie. query from DB connection) @@ -136,8 +137,8 @@ void demonstrateAcquireInScopePipeline() throws InterruptedException { Thread.sleep(500); //we've finished processing, let's check resource has been automatically released assertThat(counter).as("after all emitted").hasValue(3); - assertThat(pool.poolConfig.allocationStrategy().estimatePermitCount()).as("allocation permits").isZero(); - assertThat(pool.elements).as("available").hasSize(1); + assertThat(pool.metrics().allocatedSize()).as("allocation permits").isEqualTo(pool.metrics().getMaxAllocatedSize()); + assertThat(pool.metrics().idleSize()).as("available").isOne(); assertThat(releaseRef).as("released").hasValue("Hello Reactive World"); } @@ -171,11 +172,11 @@ void allocatedReleasedOrAbortedIfCancelRequestRace(int round, AtomicInteger newC try { - PoolConfig testConfig = poolableTestConfig(0, 1, + InstrumentedPool pool = poolableTestBuilder(0, 1, Mono.defer(() -> Mono.delay(Duration.ofMillis(50)).thenReturn(new PoolableTest(newCount.incrementAndGet()))) .subscribeOn(scheduler), - pt -> releasedCount.incrementAndGet()); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + pt -> releasedCount.incrementAndGet()) +.buildPool(); //acquire the only element and capture the subscription, don't request just yet CountDownLatch latch = new CountDownLatch(1); @@ -214,10 +215,10 @@ protected void hookOnSubscribe(Subscription subscription) { void defaultThreadDeliveringWhenHasElements() throws InterruptedException { AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(Schedulers.newParallel("poolable test allocator"))) +.buildPool(); pool.warmup().block(); //the pool is started and warmed up with one available element @@ -237,10 +238,10 @@ void defaultThreadDeliveringWhenHasElements() throws InterruptedException { void defaultThreadDeliveringWhenNoElementsButNotFull() throws InterruptedException { AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, + InstrumentedPool pool = poolableTestBuilder(0, 1, Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(Schedulers.newParallel("poolable test allocator"))) +.buildPool(); //the pool is started with no elements, and has capacity for 1 //we prepare to acquire, which would allocate the element @@ -262,10 +263,10 @@ void defaultThreadDeliveringWhenNoElementsAndFull() throws InterruptedException Scheduler acquireScheduler = Schedulers.newSingle("acquire"); Scheduler releaseScheduler = Schedulers.fromExecutorService( Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(Schedulers.newParallel("poolable test allocator"))) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -322,11 +323,10 @@ void defaultThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int round, AtomicI Scheduler racerAcquireScheduler = Schedulers.fromExecutorService( Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"racerAcquire")))); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(Schedulers.newParallel("poolable test allocator"))) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -360,11 +360,11 @@ void consistentThreadDeliveringWhenHasElements() throws InterruptedException { Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with one available element //we prepare to acquire it @@ -384,11 +384,11 @@ void consistentThreadDeliveringWhenNoElementsButNotFull() throws InterruptedExce Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, + InstrumentedPool pool = poolableTestBuilder(0, 1, Mono.fromCallable(PoolableTest::new) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with no elements, and has capacity for 1 //we prepare to acquire, which would allocate the element @@ -411,11 +411,11 @@ void consistentThreadDeliveringWhenNoElementsAndFull() throws InterruptedExcepti Scheduler acquireScheduler = Schedulers.newSingle("acquire"); Scheduler releaseScheduler = Schedulers.fromExecutorService( Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -460,11 +460,11 @@ void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int i) throws I Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"racerRelease")))); Scheduler racerAcquireScheduler = Schedulers.newSingle("racerAcquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -498,10 +498,10 @@ void acquireReleaseRaceWithMinSize_loop() { final Scheduler racer = Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)); AtomicInteger newCount = new AtomicInteger(); try { - PoolConfig testConfig = from(Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement()))) + InstrumentedPool pool = + from(Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement()))) .sizeBetween(4, 5) - .buildConfig(); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .buildPool(); for (int i = 0; i < 100; i++) { RaceTestUtils.race(() -> pool.acquire().block().release().block(), @@ -526,8 +526,9 @@ class AcquireInScopeTest { @DisplayName("acquire delays instead of allocating past maxSize") void acquireDelaysNotAllocate() { AtomicInteger newCount = new AtomicInteger(); - SimpleFifoPool pool = new SimpleFifoPool<>(poolableTestConfig(2, 3, - Mono.defer(() -> Mono.just(new PoolableTest(newCount.incrementAndGet()))))); + InstrumentedPool pool = poolableTestBuilder(2, 3, + Mono.defer(() -> Mono.just(new PoolableTest(newCount.incrementAndGet())))) + .buildPool(); pool.withPoolable(poolable -> Mono.just(poolable).delayElement(Duration.ofMillis(500))).subscribe(); pool.withPoolable(poolable -> Mono.just(poolable).delayElement(Duration.ofMillis(500))).subscribe(); @@ -564,11 +565,11 @@ void allocatedReleasedOrAbortedIfCancelRequestRace() throws InterruptedException void allocatedReleasedOrAbortedIfCancelRequestRace(int round, AtomicInteger newCount, AtomicInteger releasedCount, boolean cancelFirst) throws InterruptedException { Scheduler scheduler = Schedulers.newParallel("poolable test allocator"); - PoolConfig testConfig = poolableTestConfig(0, 1, + InstrumentedPool pool = poolableTestBuilder(0, 1, Mono.defer(() -> Mono.delay(Duration.ofMillis(50)).thenReturn(new PoolableTest(newCount.incrementAndGet()))) .subscribeOn(scheduler), - pt -> releasedCount.incrementAndGet()); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + pt -> releasedCount.incrementAndGet()) +.buildPool(); //acquire the only element and capture the subscription, don't request just yet CountDownLatch latch = new CountDownLatch(1); @@ -603,10 +604,10 @@ protected void hookOnSubscribe(Subscription subscription) { void defaultThreadDeliveringWhenHasElements() throws InterruptedException { AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(Schedulers.newParallel("poolable test allocator"))) +.buildPool(); pool.warmup().block(); //the pool is started and warmed up with one available element @@ -626,10 +627,10 @@ void defaultThreadDeliveringWhenHasElements() throws InterruptedException { void defaultThreadDeliveringWhenNoElementsButNotFull() throws InterruptedException { AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, + InstrumentedPool pool = poolableTestBuilder(0, 1, Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(Schedulers.newParallel("poolable test allocator"))) +.buildPool(); //the pool is started with no elements, and has capacity for 1 //we prepare to acquire, which would allocate the element @@ -651,10 +652,10 @@ void defaultThreadDeliveringWhenNoElementsAndFull() throws InterruptedException Scheduler acquireScheduler = Schedulers.newSingle("acquire"); Scheduler releaseScheduler = Schedulers.fromExecutorService( Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(Schedulers.newParallel("poolable test allocator"))) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -713,11 +714,10 @@ void defaultThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int round, AtomicI Scheduler allocatorScheduler = Schedulers.newParallel("poolable test allocator"); try { - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) - .subscribeOn(allocatorScheduler)); - - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + .subscribeOn(allocatorScheduler)) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -765,11 +765,11 @@ void consistentThreadDeliveringWhenHasElements() throws InterruptedException { Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with one available element //we prepare to acquire it @@ -789,11 +789,11 @@ void consistentThreadDeliveringWhenNoElementsButNotFull() throws InterruptedExce Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); AtomicReference threadName = new AtomicReference<>(); Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, + InstrumentedPool pool = poolableTestBuilder(0, 1, Mono.fromCallable(PoolableTest::new) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with no elements, and has capacity for 1 //we prepare to acquire, which would allocate the element @@ -816,11 +816,11 @@ void consistentThreadDeliveringWhenNoElementsAndFull() throws InterruptedExcepti Scheduler acquireScheduler = Schedulers.newSingle("acquire"); Scheduler releaseScheduler = Schedulers.fromExecutorService( Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(PoolableTest::new) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -865,11 +865,11 @@ void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int i) throws I Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"racerRelease")))); Scheduler racerAcquireScheduler = Schedulers.newSingle("racerAcquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, + InstrumentedPool pool = poolableTestBuilder(1, 1, Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleFifoPool pool = new SimpleFifoPool<>(testConfig); + deliveryScheduler) +.buildPool(); //the pool is started with one elements, and has capacity for 1. //we actually first acquire that element so that next acquire will wait for a release @@ -901,12 +901,12 @@ void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int i) throws I @Test void stillacquiredAfterPoolDisposedMaintainsCount() { AtomicInteger cleanerCount = new AtomicInteger(); - SimpleFifoPool pool = new SimpleFifoPool<>( + InstrumentedPool pool = from(Mono.fromCallable(PoolableTest::new)) .sizeBetween(3, 3) .releaseHandler(p -> Mono.fromRunnable(cleanerCount::incrementAndGet)) .evictionPredicate((value, metadata) -> !value.isHealthy()) - .buildConfig()); + .buildPool(); PooledRef acquired1 = pool.acquire().block(); PooledRef acquired2 = pool.acquire().block(); @@ -918,13 +918,13 @@ void stillacquiredAfterPoolDisposedMaintainsCount() { pool.dispose(); - assertThat(pool.acquired).as("before releases").isEqualTo(3); + assertThat(pool.metrics().acquiredSize()).as("before releases").isEqualTo(3); acquired1.release().block(); acquired2.release().block(); acquired3.release().block(); - assertThat(pool.acquired).as("after releases").isEqualTo(0); + assertThat(pool.metrics().acquiredSize()).as("after releases").isEqualTo(0); } @SuppressWarnings("FutureReturnValueIgnored") @@ -936,10 +936,10 @@ void concurrentAcquireCorrectlyAccountsAll(int parallelism, int loops) throws In autoDispose(executorService::shutdownNow); for (int l = 0; l < loops; l++) { - PoolConfig config = PoolBuilder.from(Mono.just("foo")) - .sizeBetween(0, 100) - .buildConfig(); - SimpleFifoPool fifoPool = autoDispose(new SimpleFifoPool<>(config)); + InstrumentedPool fifoPool = autoDispose( + PoolBuilder.from(Mono.just("foo")) + .sizeBetween(0, 100) + .buildPool()); CountDownLatch latch = new CountDownLatch(parallelism); for (int i = 0; i < parallelism; i++) { diff --git a/src/test/java/reactor/pool/CommonPoolTest.java b/src/test/java/reactor/pool/CommonPoolTest.java index 203e5ec1..7fc0f8b8 100644 --- a/src/test/java/reactor/pool/CommonPoolTest.java +++ b/src/test/java/reactor/pool/CommonPoolTest.java @@ -18,11 +18,9 @@ import java.io.Closeable; import java.io.IOException; -import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Formatter; import java.util.FormatterClosedException; import java.util.List; @@ -44,6 +42,7 @@ import org.awaitility.Awaitility; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -72,22 +71,23 @@ */ public class CommonPoolTest { - static final Function, AbstractPool> simplePoolFifo() { + static final Function, AbstractPool> lruFifo() { return new Function, AbstractPool>() { @Override public AbstractPool apply(PoolBuilder builder) { - return (AbstractPool) builder.fifo(); + return (AbstractPool) builder.buildPool(); } @Override public String toString() { - return "simplePool FIFO"; + return "LRU & FIFO"; } }; } - static final Function, AbstractPool> simplePoolLifo() { + static final Function, AbstractPool> lruLifo() { return new Function, AbstractPool>() { + @SuppressWarnings("deprecation") @Override public AbstractPool apply(PoolBuilder builder) { return (AbstractPool) builder.lifo(); @@ -95,21 +95,58 @@ public AbstractPool apply(PoolBuilder builder) { @Override public String toString() { - return "simplePool LIFO"; + return "LRU & LIFO"; + } + }; + } + + static final Function, AbstractPool> mruFifo() { + return new Function, AbstractPool>() { + @Override + public AbstractPool apply(PoolBuilder builder) { + return (AbstractPool) builder.idleResourceReuseMruOrder().buildPool(); + } + + @Override + public String toString() { + return "MRU & FIFO"; + } + }; + } + + static final Function, AbstractPool> mruLifo() { + return new Function, AbstractPool>() { + @SuppressWarnings("deprecation") + @Override + public AbstractPool apply(PoolBuilder builder) { + return (AbstractPool) builder.idleResourceReuseMruOrder().lifo(); + } + + @Override + public String toString() { + return "MRU & LIFO"; } }; } static List, AbstractPool>> allPools() { - return Arrays.asList(simplePoolFifo(), simplePoolLifo()); + return Arrays.asList(lruFifo(), lruLifo(), mruFifo(), mruLifo()); } static List, AbstractPool>> fifoPools() { - return Collections.singletonList(simplePoolFifo()); + return Arrays.asList(lruFifo(), mruFifo()); } static List, AbstractPool>> lifoPools() { - return Collections.singletonList(simplePoolLifo()); + return Arrays.asList(lruLifo(), mruLifo()); + } + + static List, AbstractPool>> mruPools() { + return Arrays.asList(mruFifo(), mruLifo()); + } + + static List, AbstractPool>> lruPools() { + return Arrays.asList(lruFifo(), lruLifo()); } @ParameterizedTest @@ -548,6 +585,40 @@ void smokeTestAsyncLifo(Function, AbstractPool, AbstractPool> configAdjuster) { + AtomicInteger newCount = new AtomicInteger(); + PoolBuilder builder = + PoolBuilder.from(Mono.fromSupplier(() -> new PoolableTest(newCount.incrementAndGet()))) + .sizeBetween(0, 3) + .releaseHandler(pt -> Mono.fromRunnable(pt::clean)) + .evictionPredicate((value, metadata) -> !value.isHealthy()); + AbstractPool pool = configAdjuster.apply(builder); + + PooledRef ref1 = pool.acquire().block(); + PooledRef ref2 = pool.acquire().block(); + PooledRef ref3 = pool.acquire().block(); + + ref2.release().block(); + ref1.release().block(); + ref3.release().block(); + + assertThat(pool.idleSize()).as("pool fully idle").isEqualTo(3); + + pool.acquire() + .as(StepVerifier::create) + .assertNext(ref -> assertThat(ref.poolable().id).as("MRU first call returns ref3").isEqualTo(3)) + .verifyComplete(); + + pool.acquire() + .as(StepVerifier::create) + .assertNext(ref -> assertThat(ref.poolable().id).as("MRU second call returns ref1").isEqualTo(1)) + .verifyComplete(); + + assertThat(pool.idleSize()).as("idleSize after 2 MRU calls").isOne(); + } + @ParameterizedTest @MethodSource("allPools") void firstAcquireCausesWarmupWithMinSize(Function, Pool> configAdjuster) @@ -2054,8 +2125,8 @@ void invalidateRaceIdleState(Function, AbstractPool, AbstractPool> configAdjuster) { + @MethodSource("lruPools") + void releaseAllOnAcquire(Function, AbstractPool> configAdjuster) { AtomicInteger intSource = new AtomicInteger(); AtomicInteger releasedIndex = new AtomicInteger(); ConcurrentLinkedQueue destroyed = new ConcurrentLinkedQueue<>(); @@ -2084,12 +2155,12 @@ void releaseOnAcquire(Function, AbstractPool> c assertThat(destroyed).as("none destroyed so far").isEmpty(); //set the release predicate to release <= 3 on acquire - releasedIndex.set(3); + releasedIndex.set(4); - assertThat(pool.acquire().block().poolable()).as("acquire post idle").isEqualTo(4); - assertThat(intSource).as("didn't generate a new value").hasValue(4); - assertThat(destroyed).as("single acquire released all evictable idle") - .containsExactly(1, 2, 3); + assertThat(pool.acquire().block().poolable()).as("allocated post idle").isEqualTo(5); + assertThat(intSource).as("did generate a new value").hasValue(5); + assertThat(destroyed).as("single acquire released all idle") + .containsExactly(1, 2, 3, 4); } @ParameterizedTest diff --git a/src/test/java/reactor/pool/PendingAcquireLifoBehaviorTest.java b/src/test/java/reactor/pool/PendingAcquireLifoBehaviorTest.java new file mode 100644 index 00000000..b0205d01 --- /dev/null +++ b/src/test/java/reactor/pool/PendingAcquireLifoBehaviorTest.java @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.pool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.pool.TestUtils.PoolableTest; +import reactor.test.util.RaceTestUtils; + +import static org.assertj.core.api.Assertions.assertThat; +import static reactor.pool.PoolBuilder.from; + +/** + * @author Simon Baslé + */ +@SuppressWarnings("deprecation") +class PendingAcquireLifoBehaviorTest { + + private Disposable.Composite disposeList; + + @BeforeEach + void initComposite() { + disposeList = Disposables.composite(); + } + + @AfterEach + void cleanup() { + disposeList.dispose(); + } + + T autoDispose(T toDispose) { + disposeList.add(toDispose); + return toDispose; + } + + static final PoolBuilder> poolableTestBuilder(int minSize, int maxSize, Mono allocator, Scheduler deliveryScheduler) { + return from(allocator) + .sizeBetween(minSize, maxSize) + .releaseHandler(pt -> Mono.fromRunnable(pt::clean)) + .evictionPredicate((value, metadata) -> !value.isHealthy()) + .acquisitionScheduler(deliveryScheduler); + } + //====== + + @Test + @Tag("loops") + void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain_loop() throws InterruptedException { + for (int i = 0; i < 10_000; i++) { + try { + consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(i); + } + finally { + cleanup(); + initComposite(); + } + } + } + + @Test + void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain() throws InterruptedException { + consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(0); + } + + void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int i) throws InterruptedException { + Scheduler allocatorScheduler = autoDispose(Schedulers.newParallel("poolable test allocator")); + Scheduler deliveryScheduler = autoDispose(Schedulers.newSingle("delivery")); + Scheduler acquire1Scheduler = autoDispose(Schedulers.newSingle("acquire1")); + Scheduler racerScheduler = autoDispose(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2, r -> new Thread(r, "racer")))); + + AtomicReference threadName = new AtomicReference<>(); + AtomicInteger newCount = new AtomicInteger(); + + PoolBuilder> testBuilder = poolableTestBuilder(1, 1, + Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) + .subscribeOn(allocatorScheduler), + deliveryScheduler); + InstrumentedPool pool = testBuilder.lifo(); + + //the pool is started with one elements, and has capacity for 1. + //we actually first acquire that element so that next acquire will wait for a release + PooledRef uniqueSlot = pool.acquire().block(); + assertThat(uniqueSlot).isNotNull(); + + //we prepare two more acquires + Mono> firstBorrower = pool.acquire(); + Mono> secondBorrower = pool.acquire(); + + CountDownLatch latch = new CountDownLatch(1); + + //we'll enqueue a first acquire from a first thread + //in parallel, we'll race a second acquire AND release the unique element (each on their dedicated threads) + //we expect the release might sometimes win, which would mean acquire 1 would get served. mostly we want to verify delivery thread though + acquire1Scheduler.schedule(() -> firstBorrower.subscribe(v -> threadName.compareAndSet(null, Thread.currentThread().getName()) + , e -> latch.countDown(), latch::countDown)); + RaceTestUtils.race(() -> secondBorrower.subscribe(v -> threadName.compareAndSet(null, Thread.currentThread().getName()) + , e -> latch.countDown(), latch::countDown), + uniqueSlot.release()::block, + racerScheduler); + + latch.await(1, TimeUnit.SECONDS); + + //we expect that, consistently, the poolable is delivered on a `delivery` thread + assertThat(threadName.get()).as("round #" + i).startsWith("delivery-"); + + //we expect that only 1 element was created + assertThat(newCount).as("elements created in round " + i).hasValue(1); + } + + @Test + @Tag("loops") + void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrainWithPoolable_loop() throws InterruptedException { + for (int i = 0; i < 10_000; i++) { + try { + consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrainWithPoolable(i); + } + finally { + cleanup(); + initComposite(); + } + } + } + + @Test + void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrainWithPoolable() throws InterruptedException { + consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrainWithPoolable(0); + } + + void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrainWithPoolable(int i) throws InterruptedException { + Scheduler allocatorScheduler = autoDispose(Schedulers.newParallel("poolable test allocator")); + Scheduler deliveryScheduler = autoDispose(Schedulers.newSingle("delivery")); + Scheduler acquire1Scheduler = autoDispose(Schedulers.newSingle("acquire1")); + Scheduler racerScheduler = autoDispose(Schedulers.fromExecutorService( + Executors.newFixedThreadPool(2, (r -> new Thread(r,"racer"))))); + + AtomicReference threadName = new AtomicReference<>(); + AtomicInteger newCount = new AtomicInteger(); + + PoolBuilder> testBuilder = poolableTestBuilder(1, 1, + Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) + .subscribeOn(allocatorScheduler), + deliveryScheduler); + InstrumentedPool pool = testBuilder.lifo(); + + //the pool is started with one elements, and has capacity for 1. + //we actually first acquire that element so that next acquire will wait for a release + PooledRef uniqueSlot = pool.acquire().block(); + assertThat(uniqueSlot).isNotNull(); + + //we prepare next acquire + Mono firstBorrower = Mono.fromDirect(pool.withPoolable(Mono::just)); + Mono otherBorrower = Mono.fromDirect(pool.withPoolable(Mono::just)); + + CountDownLatch latch = new CountDownLatch(3); + + //we actually perform the acquire from its dedicated thread, capturing the thread on which the element will actually get delivered + acquire1Scheduler.schedule(() -> firstBorrower.subscribe(v -> threadName.set(Thread.currentThread().getName()) + , e -> latch.countDown(), latch::countDown)); + + //in parallel, we'll race a second acquire AND release the unique element (each on their dedicated threads) + //since LIFO we expect that if the release loses, it will server acquire1 + RaceTestUtils.race( + () -> otherBorrower.subscribe(v -> threadName.set(Thread.currentThread().getName()) + , e -> latch.countDown(), latch::countDown), + () -> { + uniqueSlot.release().block(); + latch.countDown(); + }, + racerScheduler); + latch.await(1, TimeUnit.SECONDS); + + //we expect that, consistently, the poolable is delivered on a `delivery` thread + assertThat(threadName.get()).as("round #" + i).startsWith("delivery-"); + + //2 elements MIGHT be created if the first acquire wins (since we're in auto-release mode) + assertThat(newCount.get()).as("1 or 2 elements created in round " + i).isIn(1, 2); + } + + @Test + void stillacquiredAfterPoolDisposedMaintainsCount() { + AtomicInteger cleanerCount = new AtomicInteger(); + InstrumentedPool pool = + from(Mono.fromCallable(PoolableTest::new)) + .sizeBetween(3, 3) + .releaseHandler(p -> Mono.fromRunnable(cleanerCount::incrementAndGet)) + .evictionPredicate((value, metadata) -> !value.isHealthy()) + .lifo(); + + PooledRef acquired1 = pool.acquire().block(); + PooledRef acquired2 = pool.acquire().block(); + PooledRef acquired3 = pool.acquire().block(); + + assertThat(acquired1).as("acquired1").isNotNull(); + assertThat(acquired2).as("acquired2").isNotNull(); + assertThat(acquired3).as("acquired3").isNotNull(); + + pool.dispose(); + + assertThat(pool.metrics().acquiredSize()).as("before releases").isEqualTo(3); + + acquired1.release().block(); + acquired2.release().block(); + acquired3.release().block(); + + assertThat(pool.metrics().acquiredSize()).as("after releases").isEqualTo(0); + } + + //see https://github.com/reactor/reactor-pool/issues/65 + @SuppressWarnings("FutureReturnValueIgnored") + @Tag("loops") + @ParameterizedTest + @CsvSource({"4, 1", "4, 100000", "10, 1", "10, 100000"}) + void concurrentAcquireCorrectlyAccountsAll(int parallelism, int loops) throws InterruptedException { + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(parallelism); + autoDispose(executorService::shutdownNow); + + for (int l = 0; l < loops; l++) { + PoolBuilder> builder = from(Mono.just("foo")) + .sizeBetween(0, 100); + InstrumentedPool lifoPool = autoDispose(builder.lifo()); + CountDownLatch latch = new CountDownLatch(parallelism); + + for (int i = 0; i < parallelism; i++) { + executorService.submit(() -> { + lifoPool.acquire() + .block(); + latch.countDown(); + }); + } + boolean awaited = latch.await(1, TimeUnit.SECONDS); + assertThat(awaited).as("all concurrent acquire served in loop #" + l).isTrue(); + } + } +} \ No newline at end of file diff --git a/src/test/java/reactor/pool/SimpleLifoPoolTest.java b/src/test/java/reactor/pool/SimpleLifoPoolTest.java deleted file mode 100644 index fbe44c87..00000000 --- a/src/test/java/reactor/pool/SimpleLifoPoolTest.java +++ /dev/null @@ -1,804 +0,0 @@ -/* - * Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.pool; - -import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; - -import org.assertj.core.data.Offset; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; -import org.reactivestreams.Subscription; - -import reactor.core.Disposable; -import reactor.core.Disposables; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; -import reactor.pool.TestUtils.PoolableTest; -import reactor.test.util.RaceTestUtils; -import reactor.util.function.Tuple2; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static reactor.pool.PoolBuilder.from; - -/** - * @author Simon Baslé - */ -class SimpleLifoPoolTest { - - private Disposable.Composite disposeList; - - @BeforeEach - void initComposite() { - disposeList = Disposables.composite(); - } - - @AfterEach - void cleanup() { - disposeList.dispose(); - } - - T autoDispose(T toDispose) { - disposeList.add(toDispose); - return toDispose; - } - - //FIXME extract lifo-specific tests into CommonPoolTest - - //==utils for package-private config== - static final PoolConfig poolableTestConfig(int minSize, int maxSize, Mono allocator) { - return from(allocator) - .sizeBetween(minSize, maxSize) - .releaseHandler(pt -> Mono.fromRunnable(pt::clean)) - .evictionPredicate((value, metadata) -> !value.isHealthy()) - .buildConfig(); - } - - static final PoolConfig poolableTestConfig(int minSize, int maxSize, Mono allocator, Scheduler deliveryScheduler) { - return from(allocator) - .sizeBetween(minSize, maxSize) - .releaseHandler(pt -> Mono.fromRunnable(pt::clean)) - .evictionPredicate((value, metadata) -> !value.isHealthy()) - .acquisitionScheduler(deliveryScheduler) - .buildConfig(); - } - - static final PoolConfig poolableTestConfig(int minSize, int maxSize, Mono allocator, - Consumer additionalCleaner) { - return from(allocator) - .sizeBetween(minSize, maxSize) - .releaseHandler(poolableTest -> Mono.fromRunnable(() -> { - poolableTest.clean(); - additionalCleaner.accept(poolableTest); - })) - .evictionPredicate((value, metadata) -> !value.isHealthy()) - .buildConfig(); - } - //====== - - @Test - void demonstrateAcquireInScopePipeline() throws InterruptedException { - AtomicInteger counter = new AtomicInteger(); - AtomicReference releaseRef = new AtomicReference<>(); - - SimpleLifoPool pool = new SimpleLifoPool<>( - from(Mono.just("Hello Reactive World")) - .sizeBetween(0, 1) - .releaseHandler(s -> Mono.fromRunnable(()-> releaseRef.set(s))) - .buildConfig()); - - Flux words = pool.withPoolable(poolable -> Mono.just(poolable) - //simulate deriving a value from the resource (ie. query from DB connection) - .map(resource -> resource.split(" ")) - //then further process the derived value to produce multiple values (ie. rows from a query) - .flatMapIterable(Arrays::asList) - //and all that with latency - .delayElements(Duration.ofMillis(500))); - - words.subscribe(v -> counter.incrementAndGet()); - assertThat(counter).hasValue(0); - - Thread.sleep(1100); - //we're in the middle of processing the "rows" - assertThat(counter).as("before all emitted").hasValue(2); - assertThat(releaseRef).as("still acquiring").hasValue(null); - - Thread.sleep(500); - //we've finished processing, let's check resource has been automatically released - assertThat(counter).as("after all emitted").hasValue(3); - assertThat(pool.poolConfig.allocationStrategy().estimatePermitCount()).as("allocation permits").isZero(); - assertThat(pool.elements).as("available").hasSize(1); - assertThat(releaseRef).as("released").hasValue("Hello Reactive World"); - } - - @Nested - @DisplayName("Tests around the acquire() manual mode of acquiring") - @SuppressWarnings("ClassCanBeStatic") - class AcquireTest { - - @Test - @Tag("loops") - void allocatedReleasedOrAbortedIfCancelRequestRace_loop() throws InterruptedException { - AtomicInteger newCount = new AtomicInteger(); - AtomicInteger releasedCount = new AtomicInteger(); - for (int i = 0; i < 100; i++) { - allocatedReleasedOrAbortedIfCancelRequestRace(i, newCount, releasedCount, i % 2 == 0); - } - System.out.println("Total release of " + releasedCount.get() + " for " + newCount.get() + " created over 100 rounds"); - } - - @Test - void allocatedReleasedOrAbortedIfCancelRequestRace() throws InterruptedException { - allocatedReleasedOrAbortedIfCancelRequestRace(0, new AtomicInteger(), new AtomicInteger(), true); - allocatedReleasedOrAbortedIfCancelRequestRace(1, new AtomicInteger(), new AtomicInteger(), false); - - } - - @SuppressWarnings("FutureReturnValueIgnored") - void allocatedReleasedOrAbortedIfCancelRequestRace(int round, AtomicInteger newCount, AtomicInteger releasedCount, boolean cancelFirst) throws InterruptedException { - Scheduler scheduler = Schedulers.newParallel("poolable test allocator"); - - PoolConfig testConfig = poolableTestConfig(0, 1, - Mono.defer(() -> Mono.delay(Duration.ofMillis(50)).thenReturn(new PoolableTest(newCount.incrementAndGet()))) - .subscribeOn(scheduler), - pt -> releasedCount.incrementAndGet()); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //acquire the only element and capture the subscription, don't request just yet - CountDownLatch latch = new CountDownLatch(1); - final BaseSubscriber> baseSubscriber = new BaseSubscriber>() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - //don't request - latch.countDown(); - } - }; - pool.acquire().subscribe(baseSubscriber); - latch.await(); - - final ExecutorService executorService = Executors.newFixedThreadPool(2); - if (cancelFirst) { - executorService.submit(baseSubscriber::cancel); - executorService.submit(baseSubscriber::requestUnbounded); - } - else { - executorService.submit(baseSubscriber::requestUnbounded); - executorService.submit(baseSubscriber::cancel); - } - - //release due to cancel is async, give it a bit of time - await().atMost(200, TimeUnit.MILLISECONDS).with().pollInterval(10, TimeUnit.MILLISECONDS) - .untilAsserted(() -> assertThat(releasedCount) - .as("released vs created in round " + round + (cancelFirst? " (cancel first)" : " (request first)")) - .hasValue(newCount.get())); - } - - @Test - void defaultThreadDeliveringWhenHasElements() throws InterruptedException { - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - pool.warmup().block(); - - //the pool is started and warmed up with one available element - //we prepare to acquire it - Mono> borrower = pool.acquire(); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread and see from which thread the element was delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("acquire-"); - } - - @Test - void defaultThreadDeliveringWhenNoElementsButNotFull() throws InterruptedException { - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with no elements, and has capacity for 1 - //we prepare to acquire, which would allocate the element - Mono> borrower = pool.acquire(); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread, but the allocation also happens in a dedicated thread - //we look at which thread the element was delivered from - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("poolable test allocator-"); - } - - @Test - void defaultThreadDeliveringWhenNoElementsAndFull() throws InterruptedException { - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - Scheduler releaseScheduler = Schedulers.fromExecutorService( - Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one elements, and has capacity for 1. - //we actually first acquire that element so that next acquire will wait for a release - PooledRef uniqueSlot = pool.acquire().block(); - assertThat(uniqueSlot).isNotNull(); - - //we prepare next acquire - Mono> borrower = pool.acquire(); - CountDownLatch latch = new CountDownLatch(1); - - //we actually perform the acquire from its dedicated thread, capturing the thread on which the element will actually get delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), - e -> latch.countDown(), latch::countDown)); - //after a short while, we release the acquired unique element from a third thread - releaseScheduler.schedule(uniqueSlot.release()::block, 500, TimeUnit.MILLISECONDS); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .isEqualTo("release"); - } - - //TODO add back acquire/release race tests? these are way harder with LIFO semantics - - @Test - void consistentThreadDeliveringWhenHasElements() throws InterruptedException { - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one available element - //we prepare to acquire it - Mono> borrower = pool.acquire(); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread and see from which thread the element was delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("delivery-"); - } - - @Test - void consistentThreadDeliveringWhenNoElementsButNotFull() throws InterruptedException { - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with no elements, and has capacity for 1 - //we prepare to acquire, which would allocate the element - Mono> borrower = pool.acquire(); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread, but the allocation also happens in a dedicated thread - //we look at which thread the element was delivered from - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("delivery-"); - } - - @Test - void consistentThreadDeliveringWhenNoElementsAndFull() throws InterruptedException { - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - Scheduler releaseScheduler = Schedulers.fromExecutorService( - Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one elements, and has capacity for 1. - //we actually first acquire that element so that next acquire will wait for a release - PooledRef uniqueSlot = pool.acquire().block(); - assertThat(uniqueSlot).isNotNull(); - - //we prepare next acquire - Mono> borrower = pool.acquire(); - CountDownLatch latch = new CountDownLatch(1); - - //we actually perform the acquire from its dedicated thread, capturing the thread on which the element will actually get delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), - e -> latch.countDown(), latch::countDown)); - //after a short while, we release the acquired unique element from a third thread - releaseScheduler.schedule(uniqueSlot.release()::block, 500, TimeUnit.MILLISECONDS); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("delivery-"); - } - - @Test - @Tag("loops") - void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain_loop() throws InterruptedException { - for (int i = 0; i < 10_000; i++) { - consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(i); - } - } - - @Test - void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain() throws InterruptedException { - consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(0); - } - - void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int i) throws InterruptedException { - Scheduler allocatorScheduler = Schedulers.newParallel("poolable test allocator"); - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - Scheduler acquire1Scheduler = Schedulers.newSingle("acquire1"); - Scheduler racerScheduler = Schedulers.fromExecutorService(Executors.newFixedThreadPool(2, r -> new Thread(r, "racer"))); - try { - AtomicReference threadName = new AtomicReference<>(); - AtomicInteger newCount = new AtomicInteger(); - - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) - .subscribeOn(allocatorScheduler), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one elements, and has capacity for 1. - //we actually first acquire that element so that next acquire will wait for a release - PooledRef uniqueSlot = pool.acquire().block(); - assertThat(uniqueSlot).isNotNull(); - - //we prepare two more acquires - Mono> firstBorrower = pool.acquire(); - Mono> secondBorrower = pool.acquire(); - - CountDownLatch latch = new CountDownLatch(1); - - //we'll enqueue a first acquire from a first thread - //in parallel, we'll race a second acquire AND release the unique element (each on their dedicated threads) - //we expect the release might sometimes win, which would mean acquire 1 would get served. mostly we want to verify delivery thread though - acquire1Scheduler.schedule(() -> firstBorrower.subscribe(v -> threadName.compareAndSet(null, Thread.currentThread().getName()) - , e -> latch.countDown(), latch::countDown)); - RaceTestUtils.race(() -> secondBorrower.subscribe(v -> threadName.compareAndSet(null, Thread.currentThread().getName()) - , e -> latch.countDown(), latch::countDown), - uniqueSlot.release()::block); - - latch.await(1, TimeUnit.SECONDS); - - //we expect that, consistently, the poolable is delivered on a `delivery` thread - assertThat(threadName.get()).as("round #" + i).startsWith("delivery-"); - - //we expect that only 1 element was created - assertThat(newCount).as("elements created in round " + i).hasValue(1); - } - finally { - allocatorScheduler.dispose(); - deliveryScheduler.dispose(); - racerScheduler.dispose(); - acquire1Scheduler.dispose(); - } - } - } - - @Nested - @DisplayName("Tests around the withPoolable(Function) mode of acquiring") - @SuppressWarnings("ClassCanBeStatic") - class AcquireInScopeTest { - - @Test - @DisplayName("acquire delays instead of allocating past maxSize") - void acquireDelaysNotAllocate() { - AtomicInteger newCount = new AtomicInteger(); - SimpleLifoPool pool = new SimpleLifoPool<>(poolableTestConfig(2, 3, - Mono.defer(() -> Mono.just(new PoolableTest(newCount.incrementAndGet()))))); - - pool.withPoolable(poolable -> Mono.just(poolable).delayElement(Duration.ofMillis(500))).subscribe(); - pool.withPoolable(poolable -> Mono.just(poolable).delayElement(Duration.ofMillis(500))).subscribe(); - pool.withPoolable(poolable -> Mono.just(poolable).delayElement(Duration.ofMillis(500))).subscribe(); - - final Tuple2 tuple2 = pool.withPoolable(Mono::just).elapsed().blockLast(); - - assertThat(tuple2).isNotNull(); - - assertThat(tuple2.getT1()).as("pending for 500ms").isCloseTo(500L, Offset.offset(50L)); - assertThat(tuple2.getT2().usedUp).as("discarded twice").isEqualTo(2); - assertThat(tuple2.getT2().id).as("id").isLessThan(4); - } - - @Test - @Tag("loops") - void allocatedReleasedOrAbortedIfCancelRequestRace_loop() throws InterruptedException { - AtomicInteger newCount = new AtomicInteger(); - AtomicInteger releasedCount = new AtomicInteger(); - for (int i = 0; i < 100; i++) { - allocatedReleasedOrAbortedIfCancelRequestRace(i, newCount, releasedCount, i % 2 == 0); - } - System.out.println("Total release of " + releasedCount.get() + " for " + newCount.get() + " created over 100 rounds"); - } - - @Test - void allocatedReleasedOrAbortedIfCancelRequestRace() throws InterruptedException { - allocatedReleasedOrAbortedIfCancelRequestRace(0, new AtomicInteger(), new AtomicInteger(), true); - allocatedReleasedOrAbortedIfCancelRequestRace(1, new AtomicInteger(), new AtomicInteger(), false); - - } - - @SuppressWarnings("FutureReturnValueIgnored") - void allocatedReleasedOrAbortedIfCancelRequestRace(int round, AtomicInteger newCount, AtomicInteger releasedCount, boolean cancelFirst) throws InterruptedException { - Scheduler scheduler = Schedulers.newParallel("poolable test allocator"); - - PoolConfig testConfig = poolableTestConfig(0, 1, - Mono.defer(() -> Mono.delay(Duration.ofMillis(50)).thenReturn(new PoolableTest(newCount.incrementAndGet()))) - .subscribeOn(scheduler), - pt -> releasedCount.incrementAndGet()); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //acquire the only element and capture the subscription, don't request just yet - CountDownLatch latch = new CountDownLatch(1); - final BaseSubscriber baseSubscriber = new BaseSubscriber() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - //don't request - latch.countDown(); - } - }; - pool.withPoolable(Mono::just).subscribe(baseSubscriber); - latch.await(); - - final ExecutorService executorService = Executors.newFixedThreadPool(2); - if (cancelFirst) { - executorService.submit(baseSubscriber::cancel); - executorService.submit(baseSubscriber::requestUnbounded); - } - else { - executorService.submit(baseSubscriber::requestUnbounded); - executorService.submit(baseSubscriber::cancel); - } - - //release due to cancel is async, give it a bit of time - await().atMost(100, TimeUnit.MILLISECONDS).with().pollInterval(10, TimeUnit.MILLISECONDS) - .untilAsserted(() -> assertThat(releasedCount) - .as("released vs created in round " + round + (cancelFirst? " (cancel first)" : " (request first)")) - .hasValue(newCount.get())); - } - - @Test - void defaultThreadDeliveringWhenHasElements() throws InterruptedException { - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - pool.warmup().block(); - - //the pool is started and warmed up with one available element - //we prepare to acquire it - Mono borrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread and see from which thread the element was delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("acquire-"); - } - - @Test - void defaultThreadDeliveringWhenNoElementsButNotFull() throws InterruptedException { - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with no elements, and has capacity for 1 - //we prepare to acquire, which would allocate the element - Mono borrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread, but the allocation also happens in a dedicated thread - //we look at which thread the element was delivered from - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("poolable test allocator-"); - } - - @Test - void defaultThreadDeliveringWhenNoElementsAndFull() throws InterruptedException { - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - Scheduler releaseScheduler = Schedulers.fromExecutorService( - Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator"))); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one elements, and has capacity for 1. - //we actually first acquire that element so that next acquire will wait for a release - PooledRef uniqueSlot = pool.acquire().block(); - assertThat(uniqueSlot).isNotNull(); - - //we prepare next acquire - Mono borrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - CountDownLatch latch = new CountDownLatch(1); - - //we actually perform the acquire from its dedicated thread, capturing the thread on which the element will actually get delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), - e -> latch.countDown(), latch::countDown)); - //after a short while, we release the acquired unique element from a third thread - releaseScheduler.schedule(uniqueSlot.release()::block, 500, TimeUnit.MILLISECONDS); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .isEqualTo("release"); - } - - //TODO add back acquire/release race tests? these are way harder with LIFO semantics - - @Test - void consistentThreadDeliveringWhenHasElements() throws InterruptedException { - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one available element - //we prepare to acquire it - Mono borrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread and see from which thread the element was delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("delivery-"); - } - - @Test - void consistentThreadDeliveringWhenNoElementsButNotFull() throws InterruptedException { - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - PoolConfig testConfig = poolableTestConfig(0, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with no elements, and has capacity for 1 - //we prepare to acquire, which would allocate the element - Mono borrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - CountDownLatch latch = new CountDownLatch(1); - - //we actually request the acquire from a separate thread, but the allocation also happens in a dedicated thread - //we look at which thread the element was delivered from - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), e -> latch.countDown(), latch::countDown)); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("delivery-"); - } - - @Test - void consistentThreadDeliveringWhenNoElementsAndFull() throws InterruptedException { - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - AtomicReference threadName = new AtomicReference<>(); - Scheduler acquireScheduler = Schedulers.newSingle("acquire"); - Scheduler releaseScheduler = Schedulers.fromExecutorService( - Executors.newSingleThreadScheduledExecutor((r -> new Thread(r,"release")))); - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(PoolableTest::new) - .subscribeOn(Schedulers.newParallel("poolable test allocator")), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one elements, and has capacity for 1. - //we actually first acquire that element so that next acquire will wait for a release - PooledRef uniqueSlot = pool.acquire().block(); - assertThat(uniqueSlot).isNotNull(); - - //we prepare next acquire - Mono borrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - CountDownLatch latch = new CountDownLatch(1); - - //we actually perform the acquire from its dedicated thread, capturing the thread on which the element will actually get delivered - acquireScheduler.schedule(() -> borrower.subscribe(v -> threadName.set(Thread.currentThread().getName()), - e -> latch.countDown(), latch::countDown)); - //after a short while, we release the acquired unique element from a third thread - releaseScheduler.schedule(uniqueSlot.release()::block, 500, TimeUnit.MILLISECONDS); - latch.await(1, TimeUnit.SECONDS); - - assertThat(threadName.get()) - .startsWith("delivery-"); - } - - @Test - @Tag("loops") - void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain_loop() throws InterruptedException { - for (int i = 0; i < 10_000; i++) { - consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(i); - } - } - - @Test - void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain() throws InterruptedException { - consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(0); - } - - void consistentThreadDeliveringWhenNoElementsAndFullAndRaceDrain(int i) throws InterruptedException { - Scheduler allocatorScheduler = Schedulers.newParallel("poolable test allocator"); - Scheduler deliveryScheduler = Schedulers.newSingle("delivery"); - Scheduler acquire1Scheduler = Schedulers.newSingle("acquire1"); - Scheduler racerScheduler = Schedulers.fromExecutorService( - Executors.newFixedThreadPool(2, (r -> new Thread(r,"racer")))); - - try { - AtomicReference threadName = new AtomicReference<>(); - AtomicInteger newCount = new AtomicInteger(); - - - PoolConfig testConfig = poolableTestConfig(1, 1, - Mono.fromCallable(() -> new PoolableTest(newCount.getAndIncrement())) - .subscribeOn(allocatorScheduler), - deliveryScheduler); - SimpleLifoPool pool = new SimpleLifoPool<>(testConfig); - - //the pool is started with one elements, and has capacity for 1. - //we actually first acquire that element so that next acquire will wait for a release - PooledRef uniqueSlot = pool.acquire().block(); - assertThat(uniqueSlot).isNotNull(); - - //we prepare next acquire - Mono firstBorrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - Mono otherBorrower = Mono.fromDirect(pool.withPoolable(Mono::just)); - - CountDownLatch latch = new CountDownLatch(3); - - //we actually perform the acquire from its dedicated thread, capturing the thread on which the element will actually get delivered - acquire1Scheduler.schedule(() -> firstBorrower.subscribe(v -> threadName.set(Thread.currentThread().getName()) - , e -> latch.countDown(), latch::countDown)); - - //in parallel, we'll race a second acquire AND release the unique element (each on their dedicated threads) - //since LIFO we expect that if the release loses, it will server acquire1 - RaceTestUtils.race( - () -> otherBorrower.subscribe(v -> threadName.set(Thread.currentThread().getName()) - , e -> latch.countDown(), latch::countDown), - () -> { - uniqueSlot.release().block(); - latch.countDown(); - }, - racerScheduler); - latch.await(1, TimeUnit.SECONDS); - - //we expect that, consistently, the poolable is delivered on a `delivery` thread - assertThat(threadName.get()).as("round #" + i).startsWith("delivery-"); - - //2 elements MIGHT be created if the first acquire wins (since we're in auto-release mode) - assertThat(newCount.get()).as("1 or 2 elements created in round " + i).isIn(1, 2); - } - finally { - allocatorScheduler.dispose(); - deliveryScheduler.dispose(); - acquire1Scheduler.dispose(); - racerScheduler.dispose(); - } - } - } - - @Test - void stillacquiredAfterPoolDisposedMaintainsCount() { - AtomicInteger cleanerCount = new AtomicInteger(); - SimpleLifoPool pool = new SimpleLifoPool<>( - from(Mono.fromCallable(PoolableTest::new)) - .sizeBetween(3, 3) - .releaseHandler(p -> Mono.fromRunnable(cleanerCount::incrementAndGet)) - .evictionPredicate((value, metadata) -> !value.isHealthy()) - .buildConfig()); - - PooledRef acquired1 = pool.acquire().block(); - PooledRef acquired2 = pool.acquire().block(); - PooledRef acquired3 = pool.acquire().block(); - - assertThat(acquired1).as("acquired1").isNotNull(); - assertThat(acquired2).as("acquired2").isNotNull(); - assertThat(acquired3).as("acquired3").isNotNull(); - - pool.dispose(); - - assertThat(pool.acquired).as("before releases").isEqualTo(3); - - acquired1.release().block(); - acquired2.release().block(); - acquired3.release().block(); - - assertThat(pool.acquired).as("after releases").isEqualTo(0); - } - - @SuppressWarnings("FutureReturnValueIgnored") - @ParameterizedTest - @CsvSource({"4, 1", "4, 100000", "10, 1", "10, 100000"}) - //see https://github.com/reactor/reactor-pool/issues/65 - void concurrentAcquireCorrectlyAccountsAll(int parallelism, int loops) throws InterruptedException { - final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(parallelism); - autoDispose(executorService::shutdownNow); - - for (int l = 0; l < loops; l++) { - PoolConfig config = PoolBuilder.from(Mono.just("foo")) - .sizeBetween(0, 100) - .buildConfig(); - SimpleFifoPool fifoPool = autoDispose(new SimpleFifoPool<>(config)); - CountDownLatch latch = new CountDownLatch(parallelism); - - for (int i = 0; i < parallelism; i++) { - executorService.submit(() -> { - fifoPool.acquire() - .block(); - latch.countDown(); - }); - } - boolean awaited = latch.await(1, TimeUnit.SECONDS); - assertThat(awaited).as("all concurrent acquire served in loop #" + l).isTrue(); - } - } -} \ No newline at end of file From c9c78bf9237414254c8cef9d11706f89409bb2c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Wed, 5 Aug 2020 16:58:12 +0200 Subject: [PATCH 2/2] fix #94 Deprecate fifo vs lifo distinction, preparing to remove lifo --- src/main/java/reactor/pool/PoolBuilder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/reactor/pool/PoolBuilder.java b/src/main/java/reactor/pool/PoolBuilder.java index d9de2e36..79361119 100644 --- a/src/main/java/reactor/pool/PoolBuilder.java +++ b/src/main/java/reactor/pool/PoolBuilder.java @@ -359,7 +359,9 @@ public InstrumentedPool buildPool() { * which is used when resources ARE available at the instant the {@link Pool#acquire()} is attempted. * * @return a {@link Pool} with LIFO pending acquire ordering + * @deprecated use {@link #buildPool()} instead, the FIFO vs LIFO is to be removed in 0.3.x */ + @Deprecated public InstrumentedPool lifo() { return new SimpleDequePool<>(this.buildConfig(), false); } @@ -373,7 +375,9 @@ public InstrumentedPool lifo() { * which is used when resources ARE available at the instant the {@link Pool#acquire()} is attempted. * * @return a {@link Pool} with FIFO pending acquire ordering + * @deprecated use {@link #buildPool()} instead, to be removed in 0.3.x */ + @Deprecated public InstrumentedPool fifo() { return buildPool(); }