Skip to content

Commit

Permalink
fix: CffuFactory.defaultExecutor returns unwrapped executor to pres…
Browse files Browse the repository at this point in the history
…erve `CompletableFuture` screening behavior(`CF#screenExecutor`) 😎
  • Loading branch information
oldratlee committed Feb 22, 2025
1 parent 28af2c9 commit dc7ad9b
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 103 deletions.
116 changes: 59 additions & 57 deletions cffu-core/src/main/java/io/foldright/cffu/Cffu.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ public static <T> Cffu<T>[] cffuListToArray(List<Cffu<T>> cffuList) {
*/
@Contract(pure = true)
public Executor defaultExecutor() {
return cffuExecutor;
return cffuExecutor.original;
}

/**
Expand Down
60 changes: 40 additions & 20 deletions cffu-core/src/test/java/io/foldright/cffu/CffuFactoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import io.foldright.cffu.tuple.Tuple5;
import io.foldright.test_utils.MinStageTestUtils;
import io.foldright.test_utils.TestUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -22,6 +24,7 @@
import static io.foldright.test_utils.TestingConstants.*;
import static io.foldright.test_utils.TestingExecutorUtils.testCffuFac;
import static io.foldright.test_utils.TestingExecutorUtils.testExecutor;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.ForkJoinPool.commonPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -770,8 +773,7 @@ void test_toCffu() throws Exception {
CffuFactory fac = CffuFactory.builder(dummyExecutor).forbidObtrudeMethods(true).build();
Cffu<Integer> cffu = fac.toCffu(cffu_in);
assertNotSame(cffu_in, cffu);
// FIXME
// assertSame(dummyExecutor, unwrapMadeExecutor(cffu));
assertSame(dummyExecutor, cffu.defaultExecutor());
assertSame(fac, cffu.cffuFactory());
assertEquals("obtrude methods is forbidden by cffu", assertThrowsExactly(UnsupportedOperationException.class, () ->
cffu.obtrudeValue(anotherN)
Expand Down Expand Up @@ -873,26 +875,23 @@ void test_cffuListToArray() {

@Test
void test_getter() {
assertSame(testExecutor, testCffuFac.defaultExecutor());
assertThat(testCffuFac.cffuExecutor.toString()).startsWith("CffuExecutorWrapper, original: ");

// FIXME
// assertSame(testExecutor, unwrapMadeExecutor(testCffuFac));
assertSame(testExecutor, testCffuFac.defaultExecutor());
assertFalse(testCffuFac.forbidObtrudeMethods());

CffuFactory fac = CffuFactory.builder(dummyExecutor).forbidObtrudeMethods(true).build();
// FIXME
// assertSame(dummyExecutor, unwrapMadeExecutor(fac));
assertSame(dummyExecutor, fac.defaultExecutor());
assertTrue(fac.forbidObtrudeMethods());

final CffuFactory fac2 = testCffuFac.withDefaultExecutor(dummyExecutor);
// FIXME
// assertSame(dummyExecutor, unwrapMadeExecutor(fac2));
assertSame(dummyExecutor, fac2.defaultExecutor());
assertEquals(testCffuFac.forbidObtrudeMethods(), fac2.forbidObtrudeMethods());

// FIXME
// final CffuFactory fac3 = testCffuFac.withDefaultExecutor(fac2.defaultExecutor());
// assertSame(fac2.defaultExecutor(), fac3.defaultExecutor());
// assertEquals(fac2.forbidObtrudeMethods(), fac3.forbidObtrudeMethods());
final CffuFactory fac3 = testCffuFac.withDefaultExecutor(fac2.defaultExecutor());
assertSame(fac2.defaultExecutor(), fac3.defaultExecutor());
assertEquals(fac2.forbidObtrudeMethods(), fac3.forbidObtrudeMethods());
}

@Test
Expand All @@ -910,20 +909,41 @@ void test_forbidObtrudeMethods_property() {
}

@Test
void test_executorSetting_MayBe_ThreadPerTaskExecutor() throws Exception {
void test_executorSetting_commonPool() throws Exception {
CffuFactory fac = CffuFactory.builder(commonPool()).build();
assertSame(commonPool(), (fac).defaultExecutor());

assertEquals(n, fac.supplyAsync(() -> n).get());
}

// FIXME bug to be fixed!
@Disabled
@Test
void test_executorSetting_MayRunIn_ThreadPerTaskExecutor() throws Exception {
final boolean USE_COMMON_POOL = ForkJoinPool.getCommonPoolParallelism() > 1;
final String threadNamePrefixOfCommonPool = "ForkJoinPool.commonPool-worker-";

CffuFactory fac = CffuFactory.builder(commonPool()).build();
if (USE_COMMON_POOL) {
// FIXME
// assertSame(commonPool(), unwrapMadeExecutor(fac));
fac.runAsync(() -> {
assertThat(currentThread().getName()).startsWith(threadNamePrefixOfCommonPool);
}).join();
} else {
// FIXME
// String executorClassName = unwrapMadeExecutor(fac).getClass().getName();
// assertTrue(executorClassName.endsWith("$ThreadPerTaskExecutor"));
final int COUNT = 100;

List<Cffu<Void>> cffuList = new ArrayList<>(COUNT);
CopyOnWriteArraySet<Thread> runThreads = new CopyOnWriteArraySet<>();
for (int i = 0; i < COUNT; i++) {
final Cffu<Void> cffu = fac.runAsync(() -> {
assertThat(currentThread().getName()).doesNotStartWith(threadNamePrefixOfCommonPool);
runThreads.add(currentThread());
});
cffuList.add(cffu);
}
CompletableFutureUtils.allOf(CffuFactory.cffuListToArray(cffuList)).join();

assertThat(runThreads).hasSize(COUNT);
}

assertEquals(n, fac.supplyAsync(() -> n).get());
}

// endregion
Expand Down
3 changes: 1 addition & 2 deletions cffu-core/src/test/java/io/foldright/cffu/CffuTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ void test_withCffuFactory() {

Executor executor = Runnable::run;
final Cffu<Integer> f2 = cf.withDefaultExecutor(executor);
// FIXME
// assertSame(executor, unwrapMadeExecutor(f2));
assertSame(executor, f2.defaultExecutor());
assertEquals(testCffuFac.forbidObtrudeMethods(), f2.cffuFactory().forbidObtrudeMethods());
}

Expand Down
9 changes: 4 additions & 5 deletions cffu-core/src/test/java/io/foldright/cffu/CffuTestHelper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

package io.foldright.cffu

import org.junit.jupiter.api.Assertions.assertSame
import org.junit.jupiter.api.Assertions.assertTrue
import io.kotest.matchers.string.shouldEndWith
import io.kotest.matchers.types.shouldBeSameInstanceAs
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.Executor
Expand All @@ -24,9 +24,8 @@ class FooCs<T>(cf: CompletableFuture<T>) : CompletionStage<T> by cf
fun assertIsCfDefaultExecutor(executor: Executor) {
val useCommonPool = ForkJoinPool.getCommonPoolParallelism() > 1
if (useCommonPool) {
assertSame(ForkJoinPool.commonPool(), executor)
executor shouldBeSameInstanceAs ForkJoinPool.commonPool()
} else {
val executorClassName = executor.javaClass.name
assertTrue(executorClassName.endsWith("\$ThreadPerTaskExecutor"))
executor.javaClass.name shouldEndWith "\$ThreadPerTaskExecutor"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1942,8 +1942,7 @@ void test_defaultExecutor() {
assertIsCfDefaultExecutor(defaultExecutor(new CustomizedExecutorCf<>()));

// Cffu
// FIXME
// assertSame(testExecutor, unwrapMadeExecutor(defaultExecutor(testCffuFac.completedFuture(null))));
assertSame(testExecutor, defaultExecutor(testCffuFac.completedFuture(null)));

final UnsupportedOperationException ex = assertThrowsExactly(UnsupportedOperationException.class,
() -> defaultExecutor(new FooCs<>(new CompletableFuture<>())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,64 @@ package io.foldright.cffu.spi

import io.foldright.cffu.CffuFactory
import io.foldright.cffu.getOriginalExecutor
import io.foldright.test_utils.testExecutor
import io.foldright.cffu.getScreenedExecutor
import io.foldright.cffu.getUnscreenedExecutor
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeSameInstanceAs
import io.kotest.matchers.types.shouldNotBeSameInstanceAs
import java.util.concurrent.Executor


class ExecutorWrapperProviderTest : FunSpec({

val name = "Test executor of ExecutorWrapperProviderTest"
val executor = object : Executor {
override fun execute(command: Runnable) = command.run()
override fun toString(): String = name
}

test("disable TestExecutorWrapper") {
val factory = CffuFactory.builder(testExecutor).build()
val count = TestExecutorWrapperProvider.executionCounter.get()

val factory = CffuFactory.builder(executor).build()
val cffu = factory.runAsync {}
cffu.getOriginalExecutor() shouldBeSameInstanceAs testExecutor
cffu.defaultExecutor() shouldBeSameInstanceAs executor
cffu.getOriginalExecutor() shouldBeSameInstanceAs executor
cffu.getScreenedExecutor() shouldNotBeSameInstanceAs executor
cffu.getUnscreenedExecutor() shouldNotBeSameInstanceAs executor

cffu.join()
TestExecutorWrapperProvider.executionCounter.get() shouldBe count

cffu.getScreenedExecutor().execute {}
TestExecutorWrapperProvider.executionCounter.get() shouldBe count
cffu.getUnscreenedExecutor().execute {}
TestExecutorWrapperProvider.executionCounter.get() shouldBe count
}

test("enable TestExecutorWrapper") {
enableTestExecutorWrapper()

val factory = CffuFactory.builder(testExecutor).build()
val cffu = factory.runAsync {}
// FIXME MORE test
// test the wrapped BEHAVIOR, not the wrapper instance
cffu.getOriginalExecutor() shouldBeSameInstanceAs testExecutor
val count = TestExecutorWrapperProvider.executionCounter.get()

val factory = CffuFactory.builder(executor).build()
val cffu = factory.runAsync {
println("Starting executor")
}
cffu.defaultExecutor() shouldBeSameInstanceAs executor
cffu.getOriginalExecutor() shouldBeSameInstanceAs executor
cffu.getScreenedExecutor() shouldNotBeSameInstanceAs executor
cffu.getUnscreenedExecutor() shouldNotBeSameInstanceAs executor


cffu.join()
TestExecutorWrapperProvider.executionCounter.get().shouldBe(count + 1)

cffu.getScreenedExecutor().execute {}
TestExecutorWrapperProvider.executionCounter.get().shouldBe(count + 2)
cffu.getUnscreenedExecutor().execute {}
TestExecutorWrapperProvider.executionCounter.get().shouldBe(count + 3)
}

beforeTest {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package io.foldright.cffu.spi

import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicLong


class TestExecutorWrapperProvider : ExecutorWrapperProvider {
override fun wrap(executor: Executor): Executor =
if (isTestExecutorWrapperEnabled()) object : Executor by executor {}
else executor
if (isTestExecutorWrapperEnabled()) Executor { command ->
executionCounter.incrementAndGet()
executor.execute(command)
} else executor

companion object {
val executionCounter = AtomicLong(0)
}
}

private const val PROPERTY_NAME = "cffu.test.executor.wrapper"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ fun <T> Array<out CompletionStage<T>>.toCffu(cffuFactory: CffuFactory): Array<Cf
// - Runnable[] -> Cffu<Void>
////////////////////////////////////////////////////////////

// FIXME Do NOT use
// executor: Executor = cffuFactory.defaultExecutor()
// as default value, need a Poison Object!

/**
* Returns a new Cffu that is asynchronously completed
* by tasks running in the Cffu's default asynchronous execution facility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ class CffuExtensionsTest : FunSpec({
suspend fun checkToCffu(cffu: Cffu<Int>, n: Int) {
cffu.await() shouldBe n

// FIXME
// cffu.unwrapMadeExecutor() shouldBeSameInstanceAs testExecutor
// cffu.cffuFactory() shouldBeSameInstanceAs testCffuFac
cffu.defaultExecutor() shouldBeSameInstanceAs testExecutor
cffu.cffuFactory() shouldBeSameInstanceAs testCffuFac

val fac2 = CffuFactory.builder(testFjExecutor).build()
cffu.withCffuFactory(fac2).let {
// FIXME
// it.unwrapMadeExecutor() shouldBeSameInstanceAs testFjExecutor
// it.cffuFactory() shouldBeSameInstanceAs fac2
it.defaultExecutor() shouldBeSameInstanceAs testFjExecutor
it.cffuFactory() shouldBeSameInstanceAs fac2
}
}

Expand Down

0 comments on commit dc7ad9b

Please sign in to comment.