diff --git a/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java b/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java index 3b560aa4b2..027116e1cd 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java @@ -244,8 +244,8 @@ public void triggerRefresh() { *

* This is an asynchronous call. */ - public void triggerAsyncRefresh() { - triggerAsyncRefreshWithDelay(0); + public CompletableFuture triggerAsyncRefresh() { + return triggerAsyncRefreshWithDelay(0); } /** @@ -256,10 +256,10 @@ public void triggerAsyncRefresh() { * * @param delayMillis the delay, in millseconds, before triggering the refresh */ - public void triggerAsyncRefreshWithDelay(int delayMillis) { + public CompletableFuture triggerAsyncRefreshWithDelay(int delayMillis) { final long targetBeginTime = System.currentTimeMillis() + delayMillis; - refreshExecutor.execute(() -> { + return CompletableFuture.runAsync(() -> { try { long delay = targetBeginTime - System.currentTimeMillis(); if (delay > 0) @@ -278,7 +278,7 @@ public void triggerAsyncRefreshWithDelay(int delayMillis) { LOG.log(Level.SEVERE, "Async refresh failed", e); throw e; } - }); + }, refreshExecutor); } /** diff --git a/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java b/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java index bb6bca48cc..84b847c9ba 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java +++ b/hollow/src/test/java/com/netflix/hollow/api/consumer/HollowProducerConsumerTests.java @@ -32,6 +32,7 @@ import com.netflix.hollow.tools.compact.HollowCompactor.CompactionConfig; import java.time.Duration; import java.util.BitSet; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Before; @@ -487,6 +488,28 @@ public void consumerFilteringSupport() { Assert.fail(); // fail if UnsupportedOperationException was not thrown } + @Test + public void consumerErrorsDuringRefreshArePropagated() { + HollowProducer producer = HollowProducer.withPublisher(blobStore) + .withAnnouncer(announcement) + .withBlobStager(new HollowInMemoryBlobStager()) + .build(); + long v1 = runCycle(producer, 1); + + InMemoryBlobStore otherBlobStore = new InMemoryBlobStore(); + HollowConsumer consumer = HollowConsumer.withBlobRetriever(otherBlobStore) + .withAnnouncementWatcher(announcement) + .build(); + + try { + consumer.triggerAsyncRefresh().toCompletableFuture().join(); + Assert.fail("Expected exception to be thrown by async refresh."); + } catch (Exception e) { + Assert.assertTrue(e instanceof CompletionException); + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + private long runCycle(HollowProducer producer, final int cycleNumber) { return producer.runCycle(state -> state.add(cycleNumber)); }