Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-manes committed Jan 13, 2025
1 parent 25405d6 commit 9e47dfe
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1929,18 +1929,6 @@ public void run() {
accessOrderWindowDeque().offerLast(node);
}
}

// Ensure that in-flight async computation cannot expire (reset on a completion callback)
if (isComputingAsync(node.getValue())) {
synchronized (node) {
if (!Async.isReady((CompletableFuture<?>) node.getValue())) {
long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;
setVariableTime(node, expirationTime);
setAccessTime(node, expirationTime);
setWriteTime(node, expirationTime);
}
}
}
}
}

Expand Down Expand Up @@ -2311,6 +2299,14 @@ public void putAll(Map<? extends K, ? extends V> map) {
node = nodeFactory.newNode(key, keyReferenceQueue(),
value, valueReferenceQueue(), newWeight, now);
setVariableTime(node, expireAfterCreate(key, value, expiry, now));
if (isComputingAsync(value)) {
long expirationTime = now + ASYNC_EXPIRY;
setAccessTime(node, expirationTime);
setWriteTime(node, expirationTime);
} else {
setAccessTime(node, now);
setWriteTime(node, now);
}
}
prior = data.putIfAbsent(node.getKeyReference(), node);
if (prior == null) {
Expand Down Expand Up @@ -2395,11 +2391,14 @@ public void putAll(Map<? extends K, ? extends V> map) {
exceedsTolerance =
(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
|| (expiresVariable()
&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);
&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE)
|| (refreshAfterWrite() && refreshes().containsKey(prior.getKeyReference()));
if (expired || exceedsTolerance) {
setWriteTime(prior, isComputingAsync(value) ? (now + ASYNC_EXPIRY) : now);
}

prior.setValue(value, valueReferenceQueue());
prior.setWeight(newWeight);
setWriteTime(prior, now);

discardRefresh(prior.getKeyReference());
}
Expand All @@ -2422,9 +2421,6 @@ public void putAll(Map<? extends K, ? extends V> map) {
} else if (!onlyIfAbsent && exceedsTolerance) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
if (mayUpdate) {
setWriteTime(prior, now);
}
afterRead(prior, now, /* recordHit= */ false);
}

Expand Down Expand Up @@ -2549,8 +2545,15 @@ public boolean remove(Object key, Object value) {
n.setWeight(weight);

setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);

if (isComputingAsync(value)) {
long expirationTime = now[0] + ASYNC_EXPIRY;
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
} else {
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
}
discardRefresh(k);
return n;
}
Expand Down Expand Up @@ -2606,8 +2609,14 @@ public boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefre
n.setWeight(weight);

setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
if (isComputingAsync(newValue)) {
long expirationTime = now[0] + ASYNC_EXPIRY;
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
} else {
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
}

if (shouldDiscardRefresh) {
discardRefresh(k);
Expand Down Expand Up @@ -2697,6 +2706,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
var created = nodeFactory.newNode(key, keyReferenceQueue(),
newValue[0], valueReferenceQueue(), weight[1], now[0]);
setVariableTime(created, expireAfterCreate(key, newValue[0], expiry(), now[0]));
if (isComputingAsync(newValue[0])) {
long expirationTime = now[0] + ASYNC_EXPIRY;
setAccessTime(created, expirationTime);
setWriteTime(created, expirationTime);
} else {
setAccessTime(created, now[0]);
setWriteTime(created, now[0]);
}
return created;
}

Expand Down Expand Up @@ -2730,8 +2747,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
n.setWeight(weight[1]);

setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
if (isComputingAsync(newValue[0])) {
long expirationTime = now[0] + ASYNC_EXPIRY;
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
} else {
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
}
discardRefresh(k);
return n;
}
Expand Down Expand Up @@ -2868,8 +2891,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
var created = nodeFactory.newNode(keyRef, newValue[0],
valueReferenceQueue(), weight[1], now[0]);
setVariableTime(created, varTime);
setAccessTime(created, now[0]);
setWriteTime(created, now[0]);
if (isComputingAsync(newValue[0])) {
long expirationTime = now[0] + ASYNC_EXPIRY;
setAccessTime(created, expirationTime);
setWriteTime(created, expirationTime);
} else {
setAccessTime(created, now[0]);
setWriteTime(created, now[0]);
}
discardRefresh(key);
return created;
}
Expand Down Expand Up @@ -2921,8 +2950,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
n.setWeight(weight[1]);

setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
if (isComputingAsync(newValue[0])) {
long expirationTime = now[0] + ASYNC_EXPIRY;
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
} else {
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
}
discardRefresh(kr);
return n;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -317,97 +318,99 @@ public void testExpirationOrder_access() {
assertThat(keySet).containsExactly(0, 1, 2, 5, 7, 9);
}

// Note: Caffeine skips TTL updates for writes to the same entry within a small tolerance
public void testExpirationOrder_write() throws ExecutionException {
// test lru within a single segment
FakeTicker ticker = new FakeTicker();
IdentityLoader<Integer> loader = identityLoader();
LoadingCache<Integer, Integer> cache = CaffeinatedGuava.build(Caffeine.newBuilder()
.expireAfterWrite(11, MILLISECONDS)
.expireAfterWrite(11, SECONDS)
.ticker(ticker::read),
loader);
for (int i = 0; i < 10; i++) {
cache.getUnchecked(i);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
}
Set<Integer> keySet = cache.asMap().keySet();
assertThat(keySet).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

// 0 expires
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9);

// get doesn't stop 1 from expiring
getAll(cache, asList(0, 1, 2));
CacheTesting.drainRecencyQueues(cache);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(2, 3, 4, 5, 6, 7, 8, 9, 0);

// get(K, Callable) doesn't stop 2 from expiring
cache.get(2, Callables.returning(-2));
CacheTesting.drainRecencyQueues(cache);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(3, 4, 5, 6, 7, 8, 9, 0);

// asMap.put saves 3
cache.asMap().put(3, -3);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(4, 5, 6, 7, 8, 9, 0, 3);

// asMap.replace saves 4
cache.asMap().replace(4, -4);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(5, 6, 7, 8, 9, 0, 3, 4);

// 5 expires
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(6, 7, 8, 9, 0, 3, 4);
}

// Note: Caffeine skips TTL updates for writes to the same entry within a small tolerance
public void testExpirationOrder_writeAccess() throws ExecutionException {
// test lru within a single segment
FakeTicker ticker = new FakeTicker();
IdentityLoader<Integer> loader = identityLoader();
LoadingCache<Integer, Integer> cache = CaffeinatedGuava.build(Caffeine.newBuilder()
.expireAfterWrite(5, MILLISECONDS)
.expireAfterAccess(3, MILLISECONDS)
.expireAfterWrite(5, SECONDS)
.expireAfterAccess(3, SECONDS)
.ticker(ticker::read),
loader);
for (int i = 0; i < 5; i++) {
cache.getUnchecked(i);
}
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
for (int i = 5; i < 10; i++) {
cache.getUnchecked(i);
}
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);

Set<Integer> keySet = cache.asMap().keySet();
assertThat(keySet).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

// get saves 1, 3; 0, 2, 4 expire
getAll(cache, asList(1, 3));
CacheTesting.drainRecencyQueues(cache);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(5, 6, 7, 8, 9, 1, 3);

// get saves 6, 8; 5, 7, 9 expire
getAll(cache, asList(6, 8));
CacheTesting.drainRecencyQueues(cache);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(1, 3, 6, 8);

// get fails to save 1, put saves 3
cache.asMap().put(3, -3);
getAll(cache, asList(1));
CacheTesting.drainRecencyQueues(cache);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(6, 8, 3);

// get(K, Callable) fails to save 8, replace saves 6
cache.asMap().replace(6, -6);
cache.get(8, Callables.returning(-8));
CacheTesting.drainRecencyQueues(cache);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertThat(keySet).containsExactly(3, 6);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package com.github.benmanes.caffeine.guava.compatibility;

import static com.github.benmanes.caffeine.guava.compatibility.TestingCacheLoaders.incrementingLoader;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.guava.CaffeinatedGuava;
Expand All @@ -33,12 +33,14 @@
*/
@SuppressWarnings("PreferJavaTimeOverload")
public class CacheRefreshTest extends TestCase {

// Note: Caffeine skips TTL updates for writes to the same entry within a small tolerance
public void testAutoRefresh() {
FakeTicker ticker = new FakeTicker();
IncrementingLoader loader = incrementingLoader();
LoadingCache<Integer, Integer> cache = CaffeinatedGuava.build(Caffeine.newBuilder()
.refreshAfterWrite(3, MILLISECONDS)
.expireAfterWrite(6, MILLISECONDS)
.refreshAfterWrite(3, SECONDS)
.expireAfterWrite(6, SECONDS)
.executor(MoreExecutors.directExecutor())
.ticker(ticker::read), loader);
int expectedLoads = 0;
Expand All @@ -48,7 +50,7 @@ public void testAutoRefresh() {
expectedLoads++;
assertEquals(expectedLoads, loader.getLoadCount());
assertEquals(expectedReloads, loader.getReloadCount());
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
}

assertEquals(Integer.valueOf(0), cache.getUnchecked(0));
Expand All @@ -58,7 +60,7 @@ public void testAutoRefresh() {
assertEquals(expectedReloads, loader.getReloadCount());

// refresh 0
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
cache.getUnchecked(0); // Allow refresh to return old value while refreshing
assertEquals(Integer.valueOf(1), cache.getUnchecked(0));
expectedReloads++;
Expand All @@ -69,15 +71,15 @@ public void testAutoRefresh() {

// write to 1 to delay its refresh
cache.asMap().put(1, -1);
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertEquals(Integer.valueOf(1), cache.getUnchecked(0));
assertEquals(Integer.valueOf(-1), cache.getUnchecked(1));
assertEquals(Integer.valueOf(2), cache.getUnchecked(2));
assertEquals(expectedLoads, loader.getLoadCount());
assertEquals(expectedReloads, loader.getReloadCount());

// refresh 2
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
cache.getUnchecked(2); // Allow refresh to return old value while refreshing
assertEquals(Integer.valueOf(1), cache.getUnchecked(0));
assertEquals(Integer.valueOf(-1), cache.getUnchecked(1));
Expand All @@ -86,15 +88,15 @@ public void testAutoRefresh() {
assertEquals(expectedLoads, loader.getLoadCount());
assertEquals(expectedReloads, loader.getReloadCount());

ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
assertEquals(Integer.valueOf(1), cache.getUnchecked(0));
assertEquals(Integer.valueOf(-1), cache.getUnchecked(1));
assertEquals(Integer.valueOf(3), cache.getUnchecked(2));
assertEquals(expectedLoads, loader.getLoadCount());
assertEquals(expectedReloads, loader.getReloadCount());

// refresh 0 and 1
ticker.advance(1, MILLISECONDS);
ticker.advance(1, SECONDS);
cache.getUnchecked(0); // Allow refresh to return old value while refreshing
cache.getUnchecked(1); // Allow refresh to return old value while refreshing
assertEquals(Integer.valueOf(2), cache.getUnchecked(0));
Expand Down

0 comments on commit 9e47dfe

Please sign in to comment.