Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rehabilitate quadratic pool #12711

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand All @@ -36,6 +37,7 @@
import org.eclipse.jetty.io.internal.QueuedPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
Expand Down Expand Up @@ -224,7 +226,7 @@ public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct));
}

bucket.recordAcquire();
bucket.recordAcquire(size);

// Try to acquire a pooled entry.
Pool.Entry<RetainableByteBuffer.Pooled> entry = bucket.getPool().acquire();
Expand Down Expand Up @@ -503,6 +505,7 @@ public String toString()
private class RetainedBucket
{
private final LongAdder _acquires = new LongAdder();
private final LongAdder _totalAcquired = new LongAdder();
private final LongAdder _pooled = new LongAdder();
private final LongAdder _nonPooled = new LongAdder();
private final LongAdder _evicts = new LongAdder();
Expand All @@ -523,10 +526,13 @@ private RetainedBucket(int capacity, int poolSize)
_capacity = capacity;
}

public void recordAcquire()
public void recordAcquire(int size)
{
if (isStatisticsEnabled())
{
_acquires.increment();
_totalAcquired.add(size);
}
}

public void recordEvict()
Expand Down Expand Up @@ -589,6 +595,7 @@ private int evict()
public void clear()
{
_acquires.reset();
_totalAcquired.reset();
_pooled.reset();
_nonPooled.reset();
_evicts.reset();
Expand All @@ -602,24 +609,28 @@ public String toString()
{
int entries = 0;
int inUse = 0;
for (Pool.Entry<RetainableByteBuffer.Pooled> entry : getPool().stream().toList())
Iterator<Pool.Entry<RetainableByteBuffer.Pooled>> iterator = getPool().stream().iterator();
while (iterator.hasNext())
{
Pool.Entry<RetainableByteBuffer.Pooled> entry = iterator.next();
entries++;
if (entry.isInUse())
inUse++;
}

long pooled = _pooled.longValue();
long acquires = _acquires.longValue();
float hitRatio = acquires == 0 ? Float.NaN : pooled * 100F / acquires;
return String.format("%s{capacity=%d,in-use=%d/%d,pooled/acquires=%d/%d(%.3f%%),non-pooled/evicts/removes/releases=%d/%d/%d/%d}",
float hitRatio = acquires == 0L ? Float.NaN : pooled * 100F / acquires;
long avgSize = acquires == 0L ? 0L : _totalAcquired.longValue() / acquires;
return String.format("%s{capacity=%d,in-use=%d/%d,pooled/acquires=%d/%d(%.3f%%),avgsize=%d,non-pooled/evicts/removes/releases=%d/%d/%d/%d}",
super.toString(),
getCapacity(),
inUse,
entries,
pooled,
acquires,
hitRatio,
avgSize,
_nonPooled.longValue(),
_evicts.longValue(),
_removes.longValue(),
Expand Down Expand Up @@ -719,38 +730,113 @@ protected void acquire()
}
}

/**
* A variant of the {@link ArrayByteBufferPool} that
* uses a predefined set of buckets of buffers.
*/
public static class WithBucketCapacities extends ArrayByteBufferPool
{
public WithBucketCapacities(int... capacities)
{
this(0L, 0L, capacities);
}

public WithBucketCapacities(long maxHeapMemory, long maxDirectMemory, int... capacities)
{
super(-1, 1, sort(capacities)[capacities.length - 1], Integer.MAX_VALUE, maxHeapMemory, maxDirectMemory,
c -> floorBucketIndexFor(c, capacities), i -> bucketCapacityForIndex(i, capacities));
}

private static int[] sort(int... values)
{
if (values.length == 0)
throw new IllegalArgumentException("At least one capacity is needed");
Arrays.sort(values);
return values;
}

private static int bucketCapacityForIndex(int idx, int... capacities)
{
if (idx >= capacities.length)
{
// An index over the capacities array's length is considered
// to refer to a multiple of the largest configured capacity;
// this logic is only meant for recordNoBucketAcquire().
int largestCapacity = capacities[capacities.length - 1];
int virtualIdx = idx - (capacities.length - 1);
return (virtualIdx + 1) * largestCapacity;
}
return capacities[idx];
}

private static int floorBucketIndexFor(int capacity, int... capacities)
{
int largestCapacity = capacities[capacities.length - 1];
if (capacity > largestCapacity)
{
// A capacity over the largest configured capacity returns an
// index that corresponds to where in the capacities array it
// would stand if the latter had more entries that would all
// be multiples of the largest configured capacity;
// this logic is only meant for recordNoBucketAcquire().
int remainder = capacity % largestCapacity != 0 ? 1 : 0;
int overLargestCapacityFactor = (capacity / largestCapacity) + remainder;
return overLargestCapacityFactor - 1 + capacities.length - 1;
}

int idx = 0;
for (int i = 0; i < capacities.length; i++)
{
idx = i;
if (capacities[i] > capacity)
break;
}
return idx;
}
}

/**
* A variant of the {@link ArrayByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (e.g. 1k, 2k, 4k, 8k, etc.).
* @deprecated Usage of {@code Quadratic} is often wasteful of additional space and can increase contention on
lorban marked this conversation as resolved.
Show resolved Hide resolved
* the larger buffers.
*/
@Deprecated(forRemoval = true, since = "12.1.0")
public static class Quadratic extends ArrayByteBufferPool
{
public Quadratic()
{
this(0, -1, Integer.MAX_VALUE);
this(-1, -1, Integer.MAX_VALUE);
}

public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
this(minCapacity, maxCapacity, maxBucketSize, 0L, 0L);
}

public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity,
-1,
maxCapacity,
computeMinCapacity(minCapacity),
computeMaxCapacity(maxCapacity),
maxBucketSize,
maxHeapMemory,
maxDirectMemory,
c -> 32 - Integer.numberOfLeadingZeros(c - 1),
i -> 1 << i
// The bucket indices are the powers of 2, but those powers up to minCapacity are skipped so they must be
// substracted when computing the index and added when computing the capacity; so if minCapacity is 1024, any
// number from 0 to 1024 must return index 0, and index 0 must return capacity 1024.
c -> Integer.SIZE - Integer.numberOfLeadingZeros(c - 1) - MathUtils.ceilLog2(computeMinCapacity(minCapacity)),
i -> 1 << i + MathUtils.ceilLog2(computeMinCapacity(minCapacity))
);
}

private static int computeMinCapacity(int minCapacity)
{
return minCapacity <= 0 ? 1024 : minCapacity;
}

private static int computeMaxCapacity(int maxCapacity)
{
return maxCapacity <= 0 ? 65536 : maxCapacity;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.nio.ByteBuffer;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,7 +106,7 @@ private void tryExpandBufferCapacity(int remaining)
if (remaining <= capacityLeft)
return;
int need = remaining - capacityLeft;
_currentSize = Math.min(_maxSize, TypeUtil.ceilToNextPowerOfTwo(_currentSize + need));
_currentSize = Math.min(_maxSize, MathUtils.ceilToNextPowerOfTwo(_currentSize + need));

if (_retainableByteBuffer != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,44 +391,126 @@ public void testAcquireRelease()
}

@Test
@Deprecated(forRemoval = true)
public void testQuadraticPool()
public void testQuadraticPoolBucketSizes()
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.Quadratic();

RetainableByteBuffer retain5 = pool.acquire(5, false);
retain5.release();
RetainableByteBuffer retain6 = pool.acquire(6, false);
assertThat(retain6, not(sameInstance(retain5)));
assertThat(retain6.getByteBuffer(), sameInstance(retain5.getByteBuffer()));
retain6.release();
RetainableByteBuffer retain9 = pool.acquire(9, false);
assertThat(retain9, not(sameInstance(retain5)));
retain9.release();

assertThat(pool.acquire(1, false).capacity(), is(1));
assertThat(pool.acquire(2, false).capacity(), is(2));
RetainableByteBuffer b3 = pool.acquire(3, false);
assertThat(b3.capacity(), is(4));
RetainableByteBuffer b4 = pool.acquire(4, false);
assertThat(b4.capacity(), is(4));

int capacity = 4;
while (true)
{
RetainableByteBuffer b = pool.acquire(capacity - 1, false);
assertThat(b.capacity(), Matchers.is(capacity));
b = pool.acquire(capacity, false);
assertThat(b.capacity(), Matchers.is(capacity));
ArrayByteBufferPool pool1 = new ArrayByteBufferPool.Quadratic();
String dump1 = pool1.dump();
assertThat(dump1, containsString("direct size=7\n"));
assertThat(dump1, containsString("{capacity=1024,"));
assertThat(dump1, containsString("{capacity=2048,"));
assertThat(dump1, containsString("{capacity=4096,"));
assertThat(dump1, containsString("{capacity=8192,"));
assertThat(dump1, containsString("{capacity=16384,"));
assertThat(dump1, containsString("{capacity=32768,"));
assertThat(dump1, containsString("{capacity=65536,"));

ArrayByteBufferPool pool2 = new ArrayByteBufferPool.Quadratic(100, 800, Integer.MAX_VALUE);
String dump2 = pool2.dump();
assertThat(dump2, containsString("direct size=4\n"));
assertThat(dump2, containsString("{capacity=128,"));
assertThat(dump2, containsString("{capacity=256,"));
assertThat(dump2, containsString("{capacity=512,"));
assertThat(dump2, containsString("{capacity=800,"));

ArrayByteBufferPool pool3 = new ArrayByteBufferPool.Quadratic(1, 200, Integer.MAX_VALUE);
String dump3 = pool3.dump();
assertThat(dump3, containsString("direct size=9\n"));
assertThat(dump3, containsString("{capacity=1,"));
assertThat(dump3, containsString("{capacity=2,"));
assertThat(dump3, containsString("{capacity=4,"));
assertThat(dump3, containsString("{capacity=8,"));
assertThat(dump3, containsString("{capacity=16,"));
assertThat(dump3, containsString("{capacity=32,"));
assertThat(dump3, containsString("{capacity=64,"));
assertThat(dump3, containsString("{capacity=128,"));
assertThat(dump3, containsString("{capacity=200,"));
}

if (capacity >= pool.getMaxCapacity())
break;
@Test
public void testWithBucketCapacitiesBucketSizes()
{
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.WithBucketCapacities(1024, 65536);
String dump = pool.dump();
assertThat(dump, containsString("direct size=2\n"));
assertThat(dump, containsString("{capacity=1024,"));
assertThat(dump, containsString("{capacity=65536,"));
}
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.WithBucketCapacities(30, 24);
String dump = pool.dump();
assertThat(dump, containsString("direct size=2\n"));
assertThat(dump, containsString("{capacity=24,"));
assertThat(dump, containsString("{capacity=30,"));
}
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.WithBucketCapacities(3, 7, 100);
String dump = pool.dump();
assertThat(dump, containsString("direct size=3\n"));
assertThat(dump, containsString("{capacity=3,"));
assertThat(dump, containsString("{capacity=7,"));
assertThat(dump, containsString("{capacity=100,"));
}
}

b = pool.acquire(capacity + 1, false);
assertThat(b.capacity(), Matchers.is(capacity * 2));
@Test
public void testWithBucketCapacitiesNoBucketSizes()
{
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.WithBucketCapacities(1, 100);
pool.setStatisticsEnabled(true);
pool.acquire(200, false).release();
pool.acquire(300, false).release();
pool.acquire(800, false).release();
pool.acquire(150, false).release();
String dump = pool.dump();
assertThat(dump, containsString("200: 2\n"));
assertThat(dump, containsString("300: 1\n"));
assertThat(dump, containsString("800: 1\n"));
}
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.WithBucketCapacities(1, 7, 50, 100);
pool.setStatisticsEnabled(true);
pool.acquire(200, false).release();
pool.acquire(300, false).release();
pool.acquire(800, false).release();
pool.acquire(150, false).release();
String dump = pool.dump();
assertThat(dump, containsString("200: 2\n"));
assertThat(dump, containsString("300: 1\n"));
assertThat(dump, containsString("800: 1\n"));
}
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.WithBucketCapacities(128, 512, 2048);
pool.setStatisticsEnabled(true);
pool.acquire(8192, false).release();
String dump = pool.dump();
assertThat(dump, containsString("8192: 1\n"));
}
}

capacity = capacity * 2;
@Test
public void testWithBucketCapacitiesStats()
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.WithBucketCapacities(1024, 65536);
pool.setStatisticsEnabled(true);
for (int i = 0; i < 5; i++)
{
pool.acquire(1023, false).release();
}
pool.acquire(2048, false).release();
pool.acquire(4096, false).release();
pool.acquire(4096, false).release();
pool.acquire(6144, false).release();
pool.acquire(65536 + 1, false).release();
pool.acquire(65536 + 2, false).release();
pool.acquire(65536 * 2 + 1, false).release();
pool.acquire(65536 * 3 - 1, false).release();
String dump = pool.dump();
assertThat(dump, containsString("{capacity=1024,in-use=0/1,pooled/acquires=4/5(80.000%),avgsize=1023,non-pooled/evicts/removes/releases=0/0/0/5}"));
assertThat(dump, containsString("{capacity=65536,in-use=0/1,pooled/acquires=3/4(75.000%),avgsize=4096,non-pooled/evicts/removes/releases=0/0/0/4}"));
assertThat(dump, containsString("131072: 2\n"));
assertThat(dump, containsString("196608: 2\n"));
}

@Test
Expand Down
Loading