Skip to content

Commit

Permalink
3.x: Fix Flowable.groupBy eviction-completion-replenishment problems (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored May 20, 2020
1 parent e2b1d2f commit c693edb
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,7 @@ public void onError(Throwable t) {
g.onError(t);
}
groups.clear();
if (evictedGroups != null) {
evictedGroups.clear();
}
completeEvictions();
downstream.onError(t);
}

Expand All @@ -226,10 +224,10 @@ public void onComplete() {
for (GroupedUnicast<K, V> g : groups.values()) {
g.onComplete();
}

groups.clear();
if (evictedGroups != null) {
evictedGroups.clear();
}
completeEvictions();

done = true;
downstream.onComplete();
}
Expand Down Expand Up @@ -594,6 +592,11 @@ void cleanupQueue(long emitted, boolean polled) {
while (queue.poll() != null) {
emitted++;
}

replenishParent(emitted, polled);
}

void replenishParent(long emitted, boolean polled) {
if (polled) {
emitted++;
}
Expand All @@ -618,6 +621,9 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a,
a.onError(e);
} else {
a.onComplete();
// completion doesn't mean the parent has completed
// because of evicted groups
replenishParent(emitted, polled);
}
return true;
}
Expand All @@ -632,6 +638,10 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a,
if (empty) {
cancelled.lazySet(true);
a.onComplete();

// completion doesn't mean the parent has completed
// because of evicted groups
replenishParent(emitted, polled);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.junit.*;
import org.junit.Test;
import org.mockito.Mockito;
import org.reactivestreams.*;

Expand Down Expand Up @@ -2876,4 +2877,82 @@ public void issue6974Part2Case1ObserveOnHideLoop() {
}
}
*/

static <T> Function<Consumer<Object>, ConcurrentMap<T, Object>> ttlCapGuava(Duration ttl) {
return itemEvictConsumer ->
CacheBuilder
.newBuilder()
.expireAfterWrite(ttl)
.removalListener(n -> {
if (n.getCause() != com.google.common.cache.RemovalCause.EXPLICIT) {
try {
itemEvictConsumer.accept(n.getValue());
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
}).<T, Object>build().asMap();
}

@Test
public void issue6982Case1() {
final int groups = 20;

int groupByBufferSize = 2;
int flatMapMaxConcurrency = 200 * groups;

// ~50% of executions - Not completed (latch = 1, values = 500000, errors = 0, completions = 0, timeout!,
// disposed!)

Flowable
.range(1, 500_000)
.map(i -> i % groups)
.groupBy(i -> i, i -> i, false, groupByBufferSize, ttlCapGuava(Duration.ofMillis(10)))
.flatMap(gf -> gf.observeOn(Schedulers.computation()), flatMapMaxConcurrency)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNoErrors()
.assertComplete();
}

/*
* Disabled: Takes very long. Run it locally only.
@Test
public void issue6982Case1Loop() {
for (int i = 0; i < 200; i++) {
System.out.println("issue6982Case1Loop " + i);
issue6982Case1();
}
}
*/

@Test
public void issue6982Case2() {
final int groups = 20;

int groupByBufferSize = groups * 30;
int flatMapMaxConcurrency = groups * 500;
// Always : Not completed (latch = 1, values = 14100, errors = 0, completions = 0, timeout!, disposed!)

Flowable
.range(1, 500_000)
.map(i -> i % groups)
.groupBy(i -> i, i -> i, false, groupByBufferSize, ttlCapGuava(Duration.ofMillis(10)))
.flatMap(gf -> gf.observeOn(Schedulers.computation()), flatMapMaxConcurrency)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNoErrors()
.assertComplete();
}

/*
* Disabled: Takes very long. Run it locally only.
@Test
public void issue6982Case2Loop() {
for (int i = 0; i < 200; i++) {
System.out.println("issue6982Case2Loop " + i);
issue6982Case2();
}
}
*/
}

0 comments on commit c693edb

Please sign in to comment.