diff --git a/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java b/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java
index 3b560aa4b2..027116e1cd 100644
--- a/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java
+++ b/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java
@@ -244,8 +244,8 @@ public void triggerRefresh() {
*
* This is an asynchronous call.
*/
- public void triggerAsyncRefresh() {
- triggerAsyncRefreshWithDelay(0);
+ public CompletableFuture triggerAsyncRefresh() {
+ return triggerAsyncRefreshWithDelay(0);
}
/**
@@ -256,10 +256,10 @@ public void triggerAsyncRefresh() {
*
* @param delayMillis the delay, in millseconds, before triggering the refresh
*/
- public void triggerAsyncRefreshWithDelay(int delayMillis) {
+ public CompletableFuture triggerAsyncRefreshWithDelay(int delayMillis) {
final long targetBeginTime = System.currentTimeMillis() + delayMillis;
- refreshExecutor.execute(() -> {
+ return CompletableFuture.runAsync(() -> {
try {
long delay = targetBeginTime - System.currentTimeMillis();
if (delay > 0)
@@ -278,7 +278,7 @@ public void triggerAsyncRefreshWithDelay(int delayMillis) {
LOG.log(Level.SEVERE, "Async refresh failed", e);
throw e;
}
- });
+ }, refreshExecutor);
}
/**
diff --git a/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java b/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java
index bb6bca48cc..84b847c9ba 100644
--- a/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java
+++ b/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java
@@ -32,6 +32,7 @@
import com.netflix.hollow.tools.compact.HollowCompactor.CompactionConfig;
import java.time.Duration;
import java.util.BitSet;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
@@ -487,6 +488,28 @@ public void consumerFilteringSupport() {
Assert.fail(); // fail if UnsupportedOperationException was not thrown
}
+ @Test
+ public void consumerErrorsDuringRefreshArePropagated() {
+ HollowProducer producer = HollowProducer.withPublisher(blobStore)
+ .withAnnouncer(announcement)
+ .withBlobStager(new HollowInMemoryBlobStager())
+ .build();
+ long v1 = runCycle(producer, 1);
+
+ InMemoryBlobStore otherBlobStore = new InMemoryBlobStore();
+ HollowConsumer consumer = HollowConsumer.withBlobRetriever(otherBlobStore)
+ .withAnnouncementWatcher(announcement)
+ .build();
+
+ try {
+ consumer.triggerAsyncRefresh().toCompletableFuture().join();
+ Assert.fail("Expected exception to be thrown by async refresh.");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof CompletionException);
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+ }
+
private long runCycle(HollowProducer producer, final int cycleNumber) {
return producer.runCycle(state -> state.add(cycleNumber));
}