From 3be2a512828ee52062c7d4a4ffb766c578c88d5d Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Wed, 15 Jan 2025 16:31:43 -0800 Subject: [PATCH] wip --- .../caffeine/cache/BoundedLocalCache.java | 66 ++++++++++++------- .../gradle/libs.versions.toml | 2 +- gradle/libs.versions.toml | 2 +- .../compatibility/CacheExpirationTest.java | 35 +++++----- .../guava/compatibility/CacheRefreshTest.java | 20 +++--- 5 files changed, 73 insertions(+), 52 deletions(-) diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index e4c07c5458..c27cd55daa 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -1523,6 +1523,14 @@ void setAccessTime(Node node, long now) { } } + /** Returns if the entry's write time would exceed the minimum expiration reorder threshold. */ + boolean exceedsWriteTimeTolerance(Node node, long varTime, long now) { + return (expiresAfterWrite() && (now - node.getWriteTime()) > EXPIRE_WRITE_TOLERANCE) + || (expiresVariable() + && Math.abs(varTime - node.getVariableTime()) > EXPIRE_WRITE_TOLERANCE) + || (refreshAfterWrite() && refreshes().containsKey(node.getKeyReference())); + } + /** * Performs the post-processing work required after a write. * @@ -2298,8 +2306,8 @@ public void putAll(Map map) { if (node == null) { node = nodeFactory.newNode(key, keyReferenceQueue(), value, valueReferenceQueue(), newWeight, now); - setVariableTime(node, expireAfterCreate(key, value, expiry, now)); long expirationTime = isComputingAsync(value) ? (now + ASYNC_EXPIRY) : now; + setVariableTime(node, expireAfterCreate(key, value, expiry, now)); setAccessTime(node, expirationTime); setWriteTime(node, expirationTime); } @@ -2384,11 +2392,10 @@ public void putAll(Map map) { long expirationTime = isComputingAsync(value) ? (now + ASYNC_EXPIRY) : now; if (mayUpdate) { - exceedsTolerance = - (expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE) - || (expiresVariable() - && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE); - setWriteTime(prior, expirationTime); + exceedsTolerance = exceedsWriteTimeTolerance(prior, varTime, now); + if (expired || exceedsTolerance) { + setWriteTime(prior, isComputingAsync(value) ? (now + ASYNC_EXPIRY) : now); + } prior.setValue(value, valueReferenceQueue()); prior.setWeight(newWeight); @@ -2514,8 +2521,9 @@ public boolean remove(Object key, Object value) { requireNonNull(key); requireNonNull(value); - long[] now = new long[1]; + var now = new long[1]; var oldWeight = new int[1]; + var exceedsTolerance = new boolean[1]; @SuppressWarnings({"unchecked", "Varifier"}) @Nullable K[] nodeKey = (K[]) new Object[1]; @SuppressWarnings({"unchecked", "Varifier"}) @@ -2538,8 +2546,11 @@ public boolean remove(Object key, Object value) { n.setWeight(weight); long expirationTime = isComputingAsync(value) ? (now[0] + ASYNC_EXPIRY) : now[0]; + exceedsTolerance[0] = exceedsWriteTimeTolerance(n, varTime, expirationTime); + if (exceedsTolerance[0]) { + setWriteTime(n, expirationTime); + } setAccessTime(n, expirationTime); - setWriteTime(n, expirationTime); setVariableTime(n, varTime); discardRefresh(k); @@ -2552,7 +2563,7 @@ public boolean remove(Object key, Object value) { } int weightedDifference = (weight - oldWeight[0]); - if (expiresAfterWrite() || (weightedDifference != 0)) { + if (exceedsTolerance[0] || (weightedDifference != 0)) { afterWrite(new UpdateTask(node, weightedDifference)); } else { afterRead(node, now[0], /* recordHit= */ false); @@ -2573,13 +2584,15 @@ public boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefre requireNonNull(oldValue); requireNonNull(newValue); - int weight = weigher.weigh(key, newValue); + var now = new long[1]; + var oldWeight = new int[1]; + var exceedsTolerance = new boolean[1]; @SuppressWarnings({"unchecked", "Varifier"}) @Nullable K[] nodeKey = (K[]) new Object[1]; @SuppressWarnings({"unchecked", "Varifier"}) @Nullable V[] prevValue = (V[]) new Object[1]; - int[] oldWeight = new int[1]; - long[] now = new long[1]; + + int weight = weigher.weigh(key, newValue); Node node = data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> { synchronized (n) { requireIsAlive(key, n); @@ -2597,8 +2610,11 @@ public boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefre n.setWeight(weight); long expirationTime = isComputingAsync(newValue) ? (now[0] + ASYNC_EXPIRY) : now[0]; + exceedsTolerance[0] = exceedsWriteTimeTolerance(n, varTime, expirationTime); + if (exceedsTolerance[0]) { + setWriteTime(n, expirationTime); + } setAccessTime(n, expirationTime); - setWriteTime(n, expirationTime); setVariableTime(n, varTime); if (shouldDiscardRefresh) { @@ -2613,7 +2629,7 @@ public boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefre } int weightedDifference = (weight - oldWeight[0]); - if (expiresAfterWrite() || (weightedDifference != 0)) { + if (exceedsTolerance[0] || (weightedDifference != 0)) { afterWrite(new UpdateTask(node, weightedDifference)); } else { afterRead(node, now[0], /* recordHit= */ false); @@ -2688,8 +2704,8 @@ public void replaceAll(BiFunction function) { weight[1] = weigher.weigh(key, newValue[0]); var created = nodeFactory.newNode(key, keyReferenceQueue(), newValue[0], valueReferenceQueue(), weight[1], now[0]); - setVariableTime(created, expireAfterCreate(key, newValue[0], expiry(), now[0])); long expirationTime = isComputingAsync(newValue[0]) ? (now[0] + ASYNC_EXPIRY) : now[0]; + setVariableTime(created, expireAfterCreate(key, newValue[0], expiry(), now[0])); setAccessTime(created, expirationTime); setWriteTime(created, expirationTime); return created; @@ -2724,15 +2740,11 @@ public void replaceAll(BiFunction function) { n.setValue(newValue[0], valueReferenceQueue()); n.setWeight(weight[1]); + long expirationTime = isComputingAsync(newValue[0]) ? (now[0] + ASYNC_EXPIRY) : now[0]; + setAccessTime(n, expirationTime); + setWriteTime(n, expirationTime); setVariableTime(n, varTime); - if (isComputingAsync(newValue[0])) { - long expirationTime = now[0] + ASYNC_EXPIRY; - setAccessTime(n, expirationTime); - setWriteTime(n, expirationTime); - } else { - setAccessTime(n, now[0]); - setWriteTime(n, now[0]); - } + discardRefresh(k); return n; } @@ -2853,6 +2865,7 @@ public void replaceAll(BiFunction function) { var weight = new int[2]; // old, new var cause = new RemovalCause[1]; + var exceedsTolerance = new boolean[1]; Node node = data.compute(keyRef, (kr, n) -> { if (n == null) { @@ -2925,8 +2938,11 @@ public void replaceAll(BiFunction function) { n.setWeight(weight[1]); long expirationTime = isComputingAsync(newValue[0]) ? (now[0] + ASYNC_EXPIRY) : now[0]; + exceedsTolerance[0] = exceedsWriteTimeTolerance(n, varTime, expirationTime); + if (cause[0].wasEvicted() || exceedsTolerance[0]) { + setWriteTime(n, expirationTime); + } setAccessTime(n, expirationTime); - setWriteTime(n, expirationTime); setVariableTime(n, varTime); discardRefresh(kr); @@ -2954,7 +2970,7 @@ public void replaceAll(BiFunction function) { afterWrite(new AddTask(node, weight[1])); } else { int weightedDifference = weight[1] - weight[0]; - if (expiresAfterWrite() || (weightedDifference != 0)) { + if (exceedsTolerance[0] || (weightedDifference != 0)) { afterWrite(new UpdateTask(node, weightedDifference)); } else { afterRead(node, now[0], /* recordHit= */ false); diff --git a/examples/coalescing-bulkloader-reactor/gradle/libs.versions.toml b/examples/coalescing-bulkloader-reactor/gradle/libs.versions.toml index 5415fed147..299c2ec435 100644 --- a/examples/coalescing-bulkloader-reactor/gradle/libs.versions.toml +++ b/examples/coalescing-bulkloader-reactor/gradle/libs.versions.toml @@ -1,7 +1,7 @@ [versions] caffeine = "3.1.8" junit = "5.11.4" -reactor = "3.7.1" +reactor = "3.7.2" truth = "1.4.4" versions = "0.51.0" diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 74837298ce..b32476ebd4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,7 +4,7 @@ auto-value = "1.11.0" awaitility = "4.2.2" bcel = "6.10.0" bnd = "7.1.0" -bouncycastle-jdk18on = "1.79" +bouncycastle-jdk18on = "1.80" cache2k = "2.6.1.Final" caffeine = "3.1.8" checkstyle = "10.21.1" diff --git a/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheExpirationTest.java b/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheExpirationTest.java index 7d2d7ad7c2..d21c314212 100644 --- a/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheExpirationTest.java +++ b/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheExpirationTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import java.util.List; import java.util.Set; @@ -317,69 +318,71 @@ public void testExpirationOrder_access() { assertThat(keySet).containsExactly(0, 1, 2, 5, 7, 9); } + // Note: Caffeine skips TTL updates for writes to the same entry within a small tolerance public void testExpirationOrder_write() throws ExecutionException { // test lru within a single segment FakeTicker ticker = new FakeTicker(); IdentityLoader loader = identityLoader(); LoadingCache cache = CaffeinatedGuava.build(Caffeine.newBuilder() - .expireAfterWrite(11, MILLISECONDS) + .expireAfterWrite(11, SECONDS) .ticker(ticker::read), loader); for (int i = 0; i < 10; i++) { cache.getUnchecked(i); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); } Set keySet = cache.asMap().keySet(); assertThat(keySet).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); // 0 expires - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9); // get doesn't stop 1 from expiring getAll(cache, asList(0, 1, 2)); CacheTesting.drainRecencyQueues(cache); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(2, 3, 4, 5, 6, 7, 8, 9, 0); // get(K, Callable) doesn't stop 2 from expiring cache.get(2, Callables.returning(-2)); CacheTesting.drainRecencyQueues(cache); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(3, 4, 5, 6, 7, 8, 9, 0); // asMap.put saves 3 cache.asMap().put(3, -3); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(4, 5, 6, 7, 8, 9, 0, 3); // asMap.replace saves 4 cache.asMap().replace(4, -4); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(5, 6, 7, 8, 9, 0, 3, 4); // 5 expires - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(6, 7, 8, 9, 0, 3, 4); } + // Note: Caffeine skips TTL updates for writes to the same entry within a small tolerance public void testExpirationOrder_writeAccess() throws ExecutionException { // test lru within a single segment FakeTicker ticker = new FakeTicker(); IdentityLoader loader = identityLoader(); LoadingCache cache = CaffeinatedGuava.build(Caffeine.newBuilder() - .expireAfterWrite(5, MILLISECONDS) - .expireAfterAccess(3, MILLISECONDS) + .expireAfterWrite(5, SECONDS) + .expireAfterAccess(3, SECONDS) .ticker(ticker::read), loader); for (int i = 0; i < 5; i++) { cache.getUnchecked(i); } - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); for (int i = 5; i < 10; i++) { cache.getUnchecked(i); } - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); Set keySet = cache.asMap().keySet(); assertThat(keySet).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); @@ -387,27 +390,27 @@ public void testExpirationOrder_writeAccess() throws ExecutionException { // get saves 1, 3; 0, 2, 4 expire getAll(cache, asList(1, 3)); CacheTesting.drainRecencyQueues(cache); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(5, 6, 7, 8, 9, 1, 3); // get saves 6, 8; 5, 7, 9 expire getAll(cache, asList(6, 8)); CacheTesting.drainRecencyQueues(cache); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(1, 3, 6, 8); // get fails to save 1, put saves 3 cache.asMap().put(3, -3); getAll(cache, asList(1)); CacheTesting.drainRecencyQueues(cache); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(6, 8, 3); // get(K, Callable) fails to save 8, replace saves 6 cache.asMap().replace(6, -6); cache.get(8, Callables.returning(-8)); CacheTesting.drainRecencyQueues(cache); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertThat(keySet).containsExactly(3, 6); } diff --git a/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheRefreshTest.java b/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheRefreshTest.java index c538fbd8b7..df91d72acf 100644 --- a/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheRefreshTest.java +++ b/guava/src/test/java/com/github/benmanes/caffeine/guava/compatibility/CacheRefreshTest.java @@ -15,7 +15,7 @@ package com.github.benmanes.caffeine.guava.compatibility; import static com.github.benmanes.caffeine.guava.compatibility.TestingCacheLoaders.incrementingLoader; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.guava.CaffeinatedGuava; @@ -33,12 +33,14 @@ */ @SuppressWarnings("PreferJavaTimeOverload") public class CacheRefreshTest extends TestCase { + + // Note: Caffeine skips TTL updates for writes to the same entry within a small tolerance public void testAutoRefresh() { FakeTicker ticker = new FakeTicker(); IncrementingLoader loader = incrementingLoader(); LoadingCache cache = CaffeinatedGuava.build(Caffeine.newBuilder() - .refreshAfterWrite(3, MILLISECONDS) - .expireAfterWrite(6, MILLISECONDS) + .refreshAfterWrite(3, SECONDS) + .expireAfterWrite(6, SECONDS) .executor(MoreExecutors.directExecutor()) .ticker(ticker::read), loader); int expectedLoads = 0; @@ -48,7 +50,7 @@ public void testAutoRefresh() { expectedLoads++; assertEquals(expectedLoads, loader.getLoadCount()); assertEquals(expectedReloads, loader.getReloadCount()); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); } assertEquals(Integer.valueOf(0), cache.getUnchecked(0)); @@ -58,7 +60,7 @@ public void testAutoRefresh() { assertEquals(expectedReloads, loader.getReloadCount()); // refresh 0 - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); cache.getUnchecked(0); // Allow refresh to return old value while refreshing assertEquals(Integer.valueOf(1), cache.getUnchecked(0)); expectedReloads++; @@ -69,7 +71,7 @@ public void testAutoRefresh() { // write to 1 to delay its refresh cache.asMap().put(1, -1); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertEquals(Integer.valueOf(1), cache.getUnchecked(0)); assertEquals(Integer.valueOf(-1), cache.getUnchecked(1)); assertEquals(Integer.valueOf(2), cache.getUnchecked(2)); @@ -77,7 +79,7 @@ public void testAutoRefresh() { assertEquals(expectedReloads, loader.getReloadCount()); // refresh 2 - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); cache.getUnchecked(2); // Allow refresh to return old value while refreshing assertEquals(Integer.valueOf(1), cache.getUnchecked(0)); assertEquals(Integer.valueOf(-1), cache.getUnchecked(1)); @@ -86,7 +88,7 @@ public void testAutoRefresh() { assertEquals(expectedLoads, loader.getLoadCount()); assertEquals(expectedReloads, loader.getReloadCount()); - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); assertEquals(Integer.valueOf(1), cache.getUnchecked(0)); assertEquals(Integer.valueOf(-1), cache.getUnchecked(1)); assertEquals(Integer.valueOf(3), cache.getUnchecked(2)); @@ -94,7 +96,7 @@ public void testAutoRefresh() { assertEquals(expectedReloads, loader.getReloadCount()); // refresh 0 and 1 - ticker.advance(1, MILLISECONDS); + ticker.advance(1, SECONDS); cache.getUnchecked(0); // Allow refresh to return old value while refreshing cache.getUnchecked(1); // Allow refresh to return old value while refreshing assertEquals(Integer.valueOf(2), cache.getUnchecked(0));