From c693edbb3cd773252185fa5f339a4c7049a3493c Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 20 May 2020 21:34:06 +0200 Subject: [PATCH] 3.x: Fix Flowable.groupBy eviction-completion-replenishment problems (#6988) --- .../operators/flowable/FlowableGroupBy.java | 22 +++-- .../flowable/FlowableGroupByTest.java | 81 ++++++++++++++++++- 2 files changed, 96 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java index 8dc325fbde..f4df324110 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java @@ -214,9 +214,7 @@ public void onError(Throwable t) { g.onError(t); } groups.clear(); - if (evictedGroups != null) { - evictedGroups.clear(); - } + completeEvictions(); downstream.onError(t); } @@ -226,10 +224,10 @@ public void onComplete() { for (GroupedUnicast g : groups.values()) { g.onComplete(); } + groups.clear(); - if (evictedGroups != null) { - evictedGroups.clear(); - } + completeEvictions(); + done = true; downstream.onComplete(); } @@ -594,6 +592,11 @@ void cleanupQueue(long emitted, boolean polled) { while (queue.poll() != null) { emitted++; } + + replenishParent(emitted, polled); + } + + void replenishParent(long emitted, boolean polled) { if (polled) { emitted++; } @@ -618,6 +621,9 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber a, a.onError(e); } else { a.onComplete(); + // completion doesn't mean the parent has completed + // because of evicted groups + replenishParent(emitted, polled); } return true; } @@ -632,6 +638,10 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber a, if (empty) { cancelled.lazySet(true); a.onComplete(); + + // completion doesn't mean the parent has completed + // because of evicted groups + replenishParent(emitted, polled); return true; } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java index e606d821f9..bfb467f424 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java @@ -18,11 +18,12 @@ import static org.mockito.Mockito.*; import java.io.IOException; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import org.junit.*; +import org.junit.Test; import org.mockito.Mockito; import org.reactivestreams.*; @@ -2876,4 +2877,82 @@ public void issue6974Part2Case1ObserveOnHideLoop() { } } */ + + static Function, ConcurrentMap> ttlCapGuava(Duration ttl) { + return itemEvictConsumer -> + CacheBuilder + .newBuilder() + .expireAfterWrite(ttl) + .removalListener(n -> { + if (n.getCause() != com.google.common.cache.RemovalCause.EXPLICIT) { + try { + itemEvictConsumer.accept(n.getValue()); + } catch (Throwable throwable) { + throw new RuntimeException(throwable); + } + } + }).build().asMap(); + } + + @Test + public void issue6982Case1() { + final int groups = 20; + + int groupByBufferSize = 2; + int flatMapMaxConcurrency = 200 * groups; + + // ~50% of executions - Not completed (latch = 1, values = 500000, errors = 0, completions = 0, timeout!, + // disposed!) + + Flowable + .range(1, 500_000) + .map(i -> i % groups) + .groupBy(i -> i, i -> i, false, groupByBufferSize, ttlCapGuava(Duration.ofMillis(10))) + .flatMap(gf -> gf.observeOn(Schedulers.computation()), flatMapMaxConcurrency) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + } + + /* + * Disabled: Takes very long. Run it locally only. + @Test + public void issue6982Case1Loop() { + for (int i = 0; i < 200; i++) { + System.out.println("issue6982Case1Loop " + i); + issue6982Case1(); + } + } + */ + + @Test + public void issue6982Case2() { + final int groups = 20; + + int groupByBufferSize = groups * 30; + int flatMapMaxConcurrency = groups * 500; + // Always : Not completed (latch = 1, values = 14100, errors = 0, completions = 0, timeout!, disposed!) + + Flowable + .range(1, 500_000) + .map(i -> i % groups) + .groupBy(i -> i, i -> i, false, groupByBufferSize, ttlCapGuava(Duration.ofMillis(10))) + .flatMap(gf -> gf.observeOn(Schedulers.computation()), flatMapMaxConcurrency) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + } + + /* + * Disabled: Takes very long. Run it locally only. + @Test + public void issue6982Case2Loop() { + for (int i = 0; i < 200; i++) { + System.out.println("issue6982Case2Loop " + i); + issue6982Case2(); + } + } + */ }