diff --git a/README.md b/README.md
index 95fc111..06c7506 100644
--- a/README.md
+++ b/README.md
@@ -1,15 +1,18 @@
-# jox
+# Jox
[data:image/s3,"s3://crabby-images/936fe/936fec9a2205585a6d469fc4db8adb99e69d01f1" alt="Ideas, suggestions, problems, questions"](https://softwaremill.community/c/open-source/11)
[data:image/s3,"s3://crabby-images/e1e7f/e1e7fe45b2a809bb1b75c7b149270d4c3faf97ab" alt="CI"](https://github.com/softwaremill/jox/actions?query=workflow%3A%22CI%22)
[data:image/s3,"s3://crabby-images/b9b6c/b9b6cf1ea5e1c993f1dd8e67480f94726f0200d3" alt="Maven Central"](https://maven-badges.herokuapp.com/maven-central/com.softwaremill.jox/core)
[data:image/s3,"s3://crabby-images/bd696/bd69697a983ec79cc5b86d08ce400feef9626f6d" alt="javadoc"](https://javadoc.io/doc/com.softwaremill.jox/core)
-Fast and Scalable Channels in Java. Designed to be used with Java 21+ and virtual threads,
-see [Project Loom](https://openjdk.org/projects/loom/) (although the `core` module can be used with Java 17+).
+Modern concurrency for Java 21+ (backed by virtual threads, see [Project Loom](https://openjdk.org/projects/loom/)).
+Includes:
-Inspired by the "Fast and Scalable Channels in Kotlin Coroutines" [paper](https://arxiv.org/abs/2211.04986), and
-the [Kotlin implementation](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).
+* Fast and Scalable Channels in Java. Inspired by the "Fast and Scalable Channels in Kotlin Coroutines"
+ [paper](https://arxiv.org/abs/2211.04986), and
+ the [Kotlin implementation](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).
+* Programmer-friendly structured concurrency
+* Blocking, synchronous, functional streaming operators
JavaDocs can be browsed
at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.jox/core/latest/com.softwaremill.jox/com/softwaremill/jox/package-summary.html).
@@ -25,7 +28,17 @@ Videos:
* [A 10-minute introduction to Jox](https://www.youtube.com/watch?v=Ss9b1HpPDt0)
* [Passing control information through channels](https://www.youtube.com/watch?v=VjiCzaiRro8)
-## Dependencies
+For a Scala version, see the [Ox project](https://github.com/softwaremill/ox).
+
+## Table of contents
+
+* [Channels](#channels)
+* [Structured concurrency](#structured-concurrency)
+* [Streaming](#streaming)
+
+## Channels
+
+### Dependency
Maven:
@@ -33,7 +46,7 @@ Maven:
com.softwaremill.jox
- core
+ channels0.2.1
```
@@ -41,18 +54,12 @@ Maven:
Gradle:
```groovy
-implementation 'com.softwaremill.jox:core:0.2.1'
-```
-
-SBT:
-
-```scala
-libraryDependencies += "com.softwaremill.jox" % "core" % "0.2.1"
+implementation 'com.softwaremill.jox:channels:0.2.1'
```
-## Usage
+### Usage
-### Rendezvous channel
+#### Rendezvous channel
```java
import com.softwaremill.jox.Channel;
@@ -84,7 +91,7 @@ class Demo1 {
}
```
-### Buffered channel
+#### Buffered channel
```java
import com.softwaremill.jox.Channel;
@@ -113,7 +120,7 @@ class Demo2 {
Unlimited channels can be created with `Channel.newUnlimitedChannel()`. Such channels will never block on send().
-### Closing a channel
+#### Closing a channel
Channels can be closed, either because the source is `done` with sending values, or when there's an `error` while
the sink processes the received values.
@@ -144,7 +151,7 @@ class Demo3 {
}
```
-### Selecting from multiple channels
+#### Selecting from multiple channels
The `select` method selects exactly one clause to complete. For example, you can receive a value from exactly one
channel:
@@ -224,7 +231,7 @@ class Demo6 {
}
```
-## Performance
+### Performance
The project includes benchmarks implemented using JMH - both for the `Channel`, as well as for some built-in Java
synchronisation primitives (queues), as well as the Kotlin channel implementation.
@@ -314,6 +321,263 @@ ParallelKotlinBenchmark.parallelChannels_defaultDispatcher 16
ChainedKotlinBenchmark.channelChain_defaultDispatcher 16 10000 avgt 20 6.039 ± 0.826 ns/op
```
+## Structured concurrency
+
+### Dependency
+
+Maven:
+
+```xml
+
+
+ com.softwaremill.jox
+ structured
+ 0.2.1
+
+```
+
+Gradle:
+
+```groovy
+implementation 'com.softwaremill.jox:structured:0.2.1'
+```
+
+### Usage
+
+#### Creating scopes and forking computations
+
+```java
+import java.util.concurrent.ExecutionException;
+
+import static com.softwaremill.jox.structured.Scopes.supervised;
+
+public class Demo {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ var result = supervised(scope -> {
+ var f1 = scope.fork(() -> {
+ Thread.sleep(500);
+ return 5;
+ });
+ var f2 = scope.fork(() -> {
+ Thread.sleep(1000);
+ return 6;
+ });
+ return f1.join() + f2.join();
+ });
+ System.out.println("result = " + result);
+ }
+}
+```
+
+* the `supervised` scope will only complete once any forks started within complete as well
+* in other words, it's guaranteed that no forks will remain running, after a `supervised` block completes
+* `fork` starts a concurrently running computation, which can be joined in a blocking way. These computatioins are
+ backed by virtual threads
+
+#### Error handling in scopes
+
+```java
+import java.util.concurrent.ExecutionException;
+
+import static com.softwaremill.jox.structured.Scopes.supervised;
+
+public class Demo {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ var result = supervised(scope -> {
+ var f1 = scope.fork(() -> {
+ Thread.sleep(1000);
+ return 6;
+ });
+ var f2 = scope.fork(() -> {
+ Thread.sleep(500);
+ throw new RuntimeException("I can’t count to 5!");
+ });
+ return f1.join() + f2.join();
+ });
+ System.out.println("result = " + result);
+ }
+}
+```
+
+* an exception thrown from the scope's body, or from any of the forks, causes the scope to end
+* any forks that are still running are then interrupted
+* once all forks complete, an `ExecutionException` is thrown by the `supervised` method
+* the cause of the `ExecutionException` is the original exception
+* any other exceptions (e.g. `InterruptedExceptions`) that have been thrown while ending the scope, are added as
+ suppressed
+
+Jox implements the "let it crash" model. When an error occurs, the entire scope ends, propagating the exception higher,
+so that it can be properly handled. Moreover, no detail is lost: all exceptions are preserved, either as causes, or
+suppressed exceptions.
+
+#### Other types of scopes & forks
+
+There are 4 types of forks:
+
+* `fork`: daemon fork, supervised; when the scope's body ends, such forks are interrupted
+* `forkUser`: user fork, supervised; when the scope's body ends, the scope's method waits until such a fork completes
+ normally
+* `forkUnsupervised`: daemon fork, unsupervised; any thrown exceptions don't cause the scope to end, but instead can be
+ discovered when the fork is `.join`ed
+* `forkCancellable`: daemon fork, unsupervised, which can be manually cancelled (interrupted)
+
+There are also 2 types of scopes:
+
+* `supervised`: the default scope, which ends when all forks user forks complete successfully, or when there's any
+ exception in supervised scopes
+* `unsupervised`: a scope where only unsupervised forks can be started
+
+#### Running computations in parallel
+
+```java
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static com.softwaremill.jox.structured.Par.par;
+
+public class Demo {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ var result = par(List.of(() -> {
+ Thread.sleep(500);
+ return 5;
+ }, () -> {
+ Thread.sleep(1000);
+ return 6;
+ }));
+ System.out.println("result = " + result);
+ }
+}
+// result = [5, 6]
+```
+
+Uses `supervised` scopes underneath.
+
+#### Racing computations
+
+```java
+import java.util.concurrent.ExecutionException;
+
+import static com.softwaremill.jox.structured.Race.race;
+
+public class Demo {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ var result = race(() -> {
+ Thread.sleep(1000);
+ return 10;
+ }, () -> {
+ Thread.sleep(500);
+ return 5;
+ });
+ // result will be 5, the other computation will be interrupted on the Thread.sleep
+ System.out.println("result = " + result);
+ }
+}
+// result = 5
+```
+
+#### Timing out a computation
+
+```java
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static com.softwaremill.jox.structured.Race.timeout;
+
+public class Demo {
+ public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
+ var result = timeout(1000, () -> {
+ Thread.sleep(500);
+ return 5;
+ });
+ System.out.println("result = " + result);
+ }
+}
+// result = 5
+```
+
+## Streaming
+
+### Dependency
+
+Maven:
+
+```xml
+
+
+ com.softwaremill.jox
+ channel-ops
+ 0.2.1
+
+```
+
+Gradle:
+
+```groovy
+implementation 'com.softwaremill.jox:channel-ops:0.2.1'
+```
+
+### Usage
+
+Using this module you can run operations on streams which require starting background threads. To do that,
+you need to pass an active concurrency scope (started using `supervised`) to the `SourceOps` constructor.
+
+Each method from `SourceOps` causes a new fork (virtual thread) to be started, which starts running its logic
+immediately (producing elements / consuming and transforming elements from the given source). Thus, this is an
+implementation of "hot streams".
+
+#### Creating streams
+
+Sources from iterables, or tick-sources, can be created by calling methods on `SourceOps`:
+
+```java
+import java.util.concurrent.ExecutionException;
+
+import static com.softwaremill.jox.structured.Scopes.supervised;
+
+public class Demo {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ supervised(scope -> {
+ new SourceOps(scope)
+ .tick(500, "tick")
+ .toSource()
+ .forEach(v -> System.out.println(v));
+ return null; // unreachable, as `tick` produces infinitely many elements
+ });
+ }
+}
+```
+
+A tick-source can also be used in the usual way, by calling `.receive` on it, or by using it in `select`'s clauses.
+
+#### Transforming streams
+
+Streams can be transformed by calling the appropriate methods on the object returned by
+`SourceOps.forSource(scope, source)`.
+
+`collect` combines the functionality of `map` and `filter`: elements are mapped, and when the mapping function returns
+`null`, the element is skipped:
+
+```java
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static com.softwaremill.jox.structured.Scopes.supervised;
+
+public class Demo {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ var result = supervised(scope -> new SourceOps(scope)
+ .fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+ .collect(n -> {
+ if (n % 2 == 0) return null;
+ else return n * 10;
+ })
+ .toSource().toList());
+ System.out.println("result = " + result);
+ }
+}
+// result = [10, 30, 50, 70, 90]
+```
+
## Feedback
Is what we are looking for!
diff --git a/bench/pom.xml b/bench/pom.xml
index cc4b96d..2754395 100644
--- a/bench/pom.xml
+++ b/bench/pom.xml
@@ -47,16 +47,6 @@
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.11.0
-
-
- --enable-preview
-
-
- org.apache.maven.pluginsmaven-shade-plugin
diff --git a/channel-ops/pom.xml b/channel-ops/pom.xml
new file mode 100644
index 0000000..dd19617
--- /dev/null
+++ b/channel-ops/pom.xml
@@ -0,0 +1,38 @@
+
+
+ 4.0.0
+
+
+ com.softwaremill.jox
+ parent
+ 0.2.1
+
+
+ channel-ops
+ 0.2.1
+ jar
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+ com.softwaremill.jox
+ channels
+ 0.2.1
+
+
+ com.softwaremill.jox
+ structured
+ 0.2.1
+
+
+
diff --git a/channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java b/channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java
new file mode 100644
index 0000000..04d54d0
--- /dev/null
+++ b/channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java
@@ -0,0 +1,134 @@
+package com.softwaremill.jox.ops;
+
+import com.softwaremill.jox.*;
+import com.softwaremill.jox.structured.Scope;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+public class SourceOps {
+ private final Scope scope;
+ private final int defaultCapacity;
+
+ public SourceOps(Scope scope) {
+ this(scope, 16);
+ }
+
+ public SourceOps(Scope scope, int defaultCapacity) {
+ this.scope = scope;
+ this.defaultCapacity = defaultCapacity;
+ }
+
+ public static ForSource forSource(Scope scope, Source s) {
+ var sourceOps = new SourceOps(scope);
+ return sourceOps.new ForSource<>(s);
+ }
+
+ public class ForSource {
+ private final Source source;
+
+ ForSource(Source source) {
+ this.source = source;
+ }
+
+ public Source toSource() {
+ return source;
+ }
+
+ /**
+ * Applies the given mapping function {@code f} to each element received from this source, and sends the
+ * results to the returned channel. If {@code f} returns {@code null}, the value will be skipped.
+ *
+ * Errors from this channel are propagated to the returned channel. Any exceptions that occur when invoking
+ * {@code f} are propagated as errors to the returned channel as well.
+ *
+ * For a lazily-evaluated version, see {@link Channel#collectAsView(Function)}.
+ *
+ * @param f The mapping function.
+ * @return Ops on a source, onto which results of the mapping function will be sent.
+ */
+ public ForSource collect(Function f) {
+ var c2 = new Channel(defaultCapacity);
+ scope.fork(() -> {
+ var repeat = true;
+ while (repeat) {
+ switch (source.receiveOrClosed()) {
+ case ChannelDone cd -> {
+ c2.doneOrClosed();
+ repeat = false;
+ }
+ case ChannelError ce -> {
+ c2.errorOrClosed(ce.cause());
+ repeat = false;
+ }
+ case Object t -> {
+ try {
+ var u = f.apply((T) t);
+ if (u != null) {
+ repeat = !(c2.sendOrClosed(u) instanceof ChannelClosed);
+ } // else skip & continue
+ } catch (Exception e) {
+ c2.errorOrClosed(e);
+ }
+ }
+ }
+ }
+ return null;
+ });
+ return new ForSource(c2);
+ }
+ }
+
+ //
+
+ public ForSource fromIterator(Iterator i) {
+ var c = new Channel(defaultCapacity);
+ scope.fork(() -> {
+ try {
+ while (i.hasNext()) {
+ c.sendOrClosed(i.next());
+ }
+ c.doneOrClosed();
+ } catch (Exception e) {
+ c.errorOrClosed(e);
+ }
+ return null;
+ });
+ return new ForSource(c);
+ }
+
+ public ForSource fromIterable(Iterable i) {
+ return fromIterator(i.iterator());
+ }
+
+ /**
+ * Creates a rendezvous channel (without a buffer, regardless of the default capacity), to which the given value is
+ * sent repeatedly, at least {@code intervalMillis}ms apart between each two elements. The first value is sent
+ * immediately.
+ *
+ * The interval is measured between the subsequent invocations of the {@code send(value)} method. Hence, if there's
+ * a slow consumer, the next tick can be sent right after the previous one is received (if it was received later
+ * than the inter-tick interval duration). However, ticks don't accumulate, e.g. when the consumer is so slow that
+ * multiple intervals pass between {@code send} invocations.
+ *
+ * Must be run within a scope, since a child fork is created which sends the ticks, and waits until the next tick
+ * can be sent.
+ *
+ * @param intervalMillis The temporal spacing between subsequent ticks.
+ * @param tickValue The value to send to the channel on every tick.
+ * @return Ops on a source to which the tick values are sent.
+ */
+ public ForSource tick(long intervalMillis, T tickValue) {
+ var c = new Channel();
+ scope.fork(() -> {
+ while (true) {
+ var start = System.nanoTime();
+ c.sendOrClosed(tickValue);
+ var end = System.nanoTime();
+ var sleep = intervalMillis * 1_000_000 - (end - start);
+ if (sleep > 0) Thread.sleep(sleep / 1_000_000, (int) sleep % 1_000_000);
+ }
+ });
+ return new ForSource(c);
+ }
+}
diff --git a/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsCollectTest.java b/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsCollectTest.java
new file mode 100644
index 0000000..8e09e12
--- /dev/null
+++ b/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsCollectTest.java
@@ -0,0 +1,101 @@
+package com.softwaremill.jox.ops;
+
+import com.softwaremill.jox.Channel;
+import com.softwaremill.jox.ChannelDone;
+import com.softwaremill.jox.Source;
+import com.softwaremill.jox.structured.Scopes;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SourceOpsCollectTest {
+ @Test
+ void testMapOverSource() throws Exception {
+ Scopes.supervised(scope -> {
+ Channel c = new Channel();
+ scope.fork(() -> {
+ c.send(1);
+ c.send(2);
+ c.send(3);
+ c.done();
+ return null;
+ });
+
+ Source s = SourceOps.forSource(scope, c).collect(x -> x * 10).toSource();
+
+ assertEquals(10, s.receive());
+ assertEquals(20, s.receive());
+ assertEquals(30, s.receive());
+ assertEquals(new ChannelDone(), s.receiveOrClosed());
+ return null;
+ });
+ }
+
+ @Test
+ void testCollectOverSource() throws Exception {
+ Scopes.supervised(scope -> {
+ Channel c = new Channel();
+ scope.fork(() -> {
+ c.send(1);
+ c.send(2);
+ c.send(3);
+ c.send(4);
+ c.send(5);
+ c.done();
+ return null;
+ });
+
+ Source s = SourceOps.forSource(scope, c).collect(x -> {
+ if (x % 2 == 0) return x * 10;
+ else return null;
+ }).toSource();
+
+ assertEquals(20, s.receive());
+ assertEquals(40, s.receive());
+ assertEquals(new ChannelDone(), s.receiveOrClosed());
+ return null;
+ });
+ }
+
+ @Test
+ void testCollectOverSourceStressTest() throws Exception {
+ for (int i = 0; i < 100000; i++) {
+ Scopes.supervised(scope -> {
+ Channel c = new Channel();
+ scope.fork(() -> {
+ c.send(1);
+ c.done();
+ return null;
+ });
+
+ Source s = SourceOps.forSource(scope, c).collect(x -> x * 10).toSource();
+
+ assertEquals(10, s.receive());
+ assertEquals(new ChannelDone(), s.receiveOrClosed());
+ return null;
+ });
+ }
+ }
+
+ @Test
+ void testCollectOverSourceUsingForSyntax() throws Exception {
+ Scopes.supervised(scope -> {
+ Channel c = new Channel();
+ scope.fork(() -> {
+ c.send(1);
+ c.send(2);
+ c.send(3);
+ c.done();
+ return null;
+ });
+
+ Source s = SourceOps.forSource(scope, c).collect(x -> x * 2).toSource();
+
+ assertEquals(2, s.receive());
+ assertEquals(4, s.receive());
+ assertEquals(6, s.receive());
+ assertEquals(new ChannelDone(), s.receiveOrClosed());
+ return null;
+ });
+ }
+}
diff --git a/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsTickTest.java b/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsTickTest.java
new file mode 100644
index 0000000..5f27da3
--- /dev/null
+++ b/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsTickTest.java
@@ -0,0 +1,51 @@
+package com.softwaremill.jox.ops;
+
+import com.softwaremill.jox.structured.Scopes;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SourceOpsTickTest {
+ @Test
+ void testTickRegularly() throws Exception {
+ Scopes.supervised(scope -> {
+ long start = System.currentTimeMillis();
+ var c = new SourceOps(scope).tick(100, "tick").toSource();
+
+ assertEquals("tick", c.receive());
+ long elapsed = System.currentTimeMillis() - start;
+ assertTrue(elapsed >= 0L && elapsed <= 50L);
+
+ assertEquals("tick", c.receive());
+ elapsed = System.currentTimeMillis() - start;
+ assertTrue(elapsed >= 100L && elapsed <= 150L);
+
+ assertEquals("tick", c.receive());
+ elapsed = System.currentTimeMillis() - start;
+ assertTrue(elapsed >= 200L && elapsed <= 250L);
+
+ return null;
+ });
+ }
+
+ @Test
+ void testTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal() throws Exception {
+ Scopes.supervised(scope -> {
+ long start = System.currentTimeMillis();
+ var c = new SourceOps(scope).tick(100, "tick").toSource();
+
+ // Simulating a slow consumer
+ Thread.sleep(200);
+ assertEquals("tick", c.receive()); // a tick should be waiting
+ long elapsed = System.currentTimeMillis() - start;
+ assertTrue(elapsed >= 200L && elapsed <= 250L);
+
+ assertEquals("tick", c.receive()); // and immediately another, as the interval between send-s has passed
+ elapsed = System.currentTimeMillis() - start;
+ assertTrue(elapsed >= 200L && elapsed <= 250L);
+
+ return null;
+ });
+ }
+}
diff --git a/channels/pom.xml b/channels/pom.xml
index d2e7c73..4b43a13 100644
--- a/channels/pom.xml
+++ b/channels/pom.xml
@@ -13,50 +13,16 @@
0.2.1jar
-
- UTF-8
- 17
- 17
- 21
- 21
-
-
org.junit.jupiterjunit-jupiter
- 5.10.1testorg.awaitilityawaitility
- 4.2.0test
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.12.1
-
-
- true
- --enable-preview
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
- 3.2.2
-
- --enable-preview
-
-
-
-
diff --git a/channels/src/main/java/com/softwaremill/jox/Source.java b/channels/src/main/java/com/softwaremill/jox/Source.java
index cbbe576..f23f53b 100644
--- a/channels/src/main/java/com/softwaremill/jox/Source.java
+++ b/channels/src/main/java/com/softwaremill/jox/Source.java
@@ -1,5 +1,8 @@
package com.softwaremill.jox;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -59,4 +62,34 @@ default Source collectAsView(Function f) {
default Source filterAsView(Predicate p) {
return new CollectSource<>(this, t -> p.test(t) ? t : null);
}
+
+ // draining operations
+
+
+ /**
+ * Invokes the given function for each received element. Blocks until the channel is done.
+ *
+ * @throws ChannelErrorException When there is an upstream error.
+ */
+ default void forEach(Consumer c) throws InterruptedException {
+ var repeat = true;
+ while (repeat) {
+ switch (receiveOrClosed()) {
+ case ChannelDone cd -> repeat = false;
+ case ChannelError ce -> throw ce.toException();
+ case Object t -> c.accept((T) t);
+ }
+ }
+ }
+
+ /**
+ * Accumulates all elements received from the channel into a list. Blocks until the channel is done.
+ *
+ * @throws ChannelErrorException When there is an upstream error.
+ */
+ default List toList() throws InterruptedException {
+ var l = new ArrayList();
+ forEach(l::add);
+ return l;
+ }
}
diff --git a/channels/src/test/java/com/softwaremill/jox/SourceOpsForEachTest.java b/channels/src/test/java/com/softwaremill/jox/SourceOpsForEachTest.java
new file mode 100644
index 0000000..8a0da87
--- /dev/null
+++ b/channels/src/test/java/com/softwaremill/jox/SourceOpsForEachTest.java
@@ -0,0 +1,37 @@
+package com.softwaremill.jox;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+
+public class SourceOpsForEachTest {
+ @Test
+ void testIterateOverSource() throws Exception {
+ var c = new Channel(10);
+ c.sendOrClosed(1);
+ c.sendOrClosed(2);
+ c.sendOrClosed(3);
+ c.doneOrClosed();
+
+ List r = new ArrayList<>();
+ c.forEach(v -> r.add(v));
+
+ assertIterableEquals(List.of(1, 2, 3), r);
+ }
+
+ @Test
+ void testConvertSourceToList() throws Exception {
+ var c = new Channel(10);
+ c.sendOrClosed(1);
+ c.sendOrClosed(2);
+ c.sendOrClosed(3);
+ c.doneOrClosed();
+
+ List resultList = c.toList();
+ assertEquals(List.of(1, 2, 3), resultList);
+ }
+}
diff --git a/pom.xml b/pom.xml
index b0ceafd..9ee2a54 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,9 @@
channels
+ structuredbench
+ channel-ops
@@ -47,6 +49,44 @@
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.13.0
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.3.1
+
+ --enable-preview
+
+
+
+
+
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.1
+
+
+ org.awaitility
+ awaitility
+ 4.2.0
+
+
+
+
diff --git a/structured/pom.xml b/structured/pom.xml
new file mode 100644
index 0000000..2445e33
--- /dev/null
+++ b/structured/pom.xml
@@ -0,0 +1,28 @@
+
+
+ 4.0.0
+
+
+ com.softwaremill.jox
+ parent
+ 0.2.1
+
+
+ structured
+ 0.2.1
+ jar
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
diff --git a/structured/src/main/java/com/softwaremill/jox/structured/CancellableFork.java b/structured/src/main/java/com/softwaremill/jox/structured/CancellableFork.java
new file mode 100644
index 0000000..35f0787
--- /dev/null
+++ b/structured/src/main/java/com/softwaremill/jox/structured/CancellableFork.java
@@ -0,0 +1,47 @@
+package com.softwaremill.jox.structured;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public interface CancellableFork extends Fork {
+ /**
+ * Interrupts the fork, and blocks until it completes with a result.
+ *
+ * @throws ExecutionException When the cancelled fork threw an exception.
+ */
+ T cancel() throws InterruptedException, ExecutionException;
+
+ /**
+ * Interrupts the fork, and returns immediately, without waiting for the fork to complete. Note that the enclosing scope will only
+ * complete once all forks have completed.
+ */
+ void cancelNow();
+}
+
+class CancellableForkUsingResult extends ForkUsingResult implements CancellableFork {
+ private final Semaphore done;
+ private final AtomicBoolean started;
+
+ CancellableForkUsingResult(CompletableFuture result, Semaphore done, AtomicBoolean started) {
+ super(result);
+ this.done = done;
+ this.started = started;
+ }
+
+ @Override
+ public T cancel() throws InterruptedException, ExecutionException {
+ cancelNow();
+ return join();
+ }
+
+ @Override
+ public void cancelNow() {
+ // will cause the scope to end, interrupting the task if it hasn't yet finished (or potentially never starting it)
+ done.release();
+ if (!started.getAndSet(true)) {
+ result.completeExceptionally(new InterruptedException("fork was cancelled before it started"));
+ }
+ }
+}
diff --git a/structured/src/main/java/com/softwaremill/jox/structured/Fork.java b/structured/src/main/java/com/softwaremill/jox/structured/Fork.java
new file mode 100644
index 0000000..369d7a0
--- /dev/null
+++ b/structured/src/main/java/com/softwaremill/jox/structured/Fork.java
@@ -0,0 +1,32 @@
+package com.softwaremill.jox.structured;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A fork started using {@link Scope#fork}, {@link Scope#forkUser}, {@link UnsupervisedScope#forkCancellable} or
+ * {@link UnsupervisedScope#forkUnsupervised}, backed by a (virtual) thread.
+ */
+public interface Fork {
+ /**
+ * Blocks until the fork completes with a result.
+ *
+ * @throws ExecutionException If the fork completed with an exception, and is unsupervised (started with
+ * {@link UnsupervisedScope#forkUnsupervised} or
+ * {@link UnsupervisedScope#forkCancellable}).
+ */
+ T join() throws InterruptedException, ExecutionException;
+}
+
+class ForkUsingResult implements Fork {
+ protected final CompletableFuture result;
+
+ ForkUsingResult(CompletableFuture result) {
+ this.result = result;
+ }
+
+ @Override
+ public T join() throws InterruptedException, ExecutionException {
+ return result.get();
+ }
+}
diff --git a/structured/src/main/java/com/softwaremill/jox/structured/Par.java b/structured/src/main/java/com/softwaremill/jox/structured/Par.java
new file mode 100644
index 0000000..46137ef
--- /dev/null
+++ b/structured/src/main/java/com/softwaremill/jox/structured/Par.java
@@ -0,0 +1,49 @@
+package com.softwaremill.jox.structured;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+
+import static com.softwaremill.jox.structured.Scopes.supervised;
+
+public class Par {
+ /**
+ * Runs the given computations in parallel. If any fails because of an exception, or if any returns an application
+ * error, other computations are interrupted. Then, the exception is re-thrown, or the error value returned.
+ */
+ public static List par(List> fs) throws ExecutionException, InterruptedException {
+ return supervised(scope -> {
+ var forks = fs.stream().map(f -> scope.fork(f)).toList();
+ var results = new ArrayList();
+ for (Fork fork : forks) {
+ results.add(fork.join());
+ }
+ return results;
+ });
+ }
+
+ /**
+ * Runs the given computations in parallel, with at most {@code parallelism} running in parallel at the same
+ * time. If any computation fails because of an exception, or if any returns an application error, other
+ * computations are interrupted. Then, the exception is re-thrown, or the error value returned.
+ */
+ public static List parLimit(int parallelism, List> fs) throws ExecutionException, InterruptedException {
+ return supervised(scope -> {
+ var s = new Semaphore(parallelism);
+ var forks = fs.stream().map(f -> scope.fork(() -> {
+ s.acquire();
+ var r = f.call();
+ // no try-finally as there's no point in releasing in case of an exception, as any newly started forks will be interrupted
+ s.release();
+ return r;
+ })).toList();
+ var results = new ArrayList();
+ for (Fork fork : forks) {
+ results.add(fork.join());
+ }
+ return results;
+ });
+ }
+}
diff --git a/structured/src/main/java/com/softwaremill/jox/structured/Race.java b/structured/src/main/java/com/softwaremill/jox/structured/Race.java
new file mode 100644
index 0000000..3d3278d
--- /dev/null
+++ b/structured/src/main/java/com/softwaremill/jox/structured/Race.java
@@ -0,0 +1,139 @@
+package com.softwaremill.jox.structured;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static com.softwaremill.jox.structured.Scopes.unsupervised;
+
+public class Race {
+ /**
+ * The result of computation {@code f}, if it took less than {@code millis} ms, and a {@link TimeoutException}
+ * otherwise.
+ *
+ * @throws TimeoutException If {@code f} took more than {@code millis}.
+ */
+ public static T timeout(long millis, Callable f) throws TimeoutException, ExecutionException, InterruptedException {
+ var result = raceResult(f::call, () -> {
+ Thread.sleep(millis);
+ return new Timeout();
+ });
+
+ if (result instanceof Timeout) {
+ throw new TimeoutException("Computation didn't finish within " + millis + "ms");
+ } else {
+ return (T) result;
+ }
+ }
+
+ /**
+ * Returns the result of the first computation to complete successfully, or if all fail - throws the first
+ * exception.
+ */
+ public static T race(Callable f1, Callable f2) throws ExecutionException, InterruptedException {
+ return race(List.of(f1, f2));
+ }
+
+ /**
+ * Returns the result of the first computation to complete successfully, or if all fail - throws the first
+ * exception.
+ */
+ public static T race(Callable f1, Callable f2, Callable f3) throws ExecutionException, InterruptedException {
+ return race(List.of(f1, f2, f3));
+ }
+
+ /**
+ * Returns the result of the first computation to complete successfully, or if all fail - throws the first
+ * exception.
+ */
+ public static T race(List> fs) throws ExecutionException, InterruptedException {
+ var exceptions = new ArrayDeque();
+
+ try {
+ return unsupervised(scope -> {
+ var branchResults = new ArrayBlockingQueue<>(fs.size());
+ fs.forEach(f -> {
+ scope.forkUnsupervised(() -> {
+ try {
+ var r = f.call();
+ if (r == null) {
+ branchResults.add(new NullWrapperInRace());
+ } else {
+ branchResults.add(r);
+ }
+ } catch (Exception e) {
+ branchResults.add(new ExceptionWrapperInRace(e));
+ }
+ return null;
+ });
+ });
+
+ var left = fs.size();
+ while (left > 0) {
+ var first = branchResults.take();
+ if (first instanceof ExceptionWrapperInRace ew) {
+ exceptions.add(ew.e);
+ } else if (first instanceof NullWrapperInRace) {
+ return null;
+ } else {
+ return (T) first;
+ }
+ left -= 1;
+ }
+
+ // if we get here, there must be an exception
+ throw exceptions.pollFirst();
+ });
+ } catch (ExecutionException e) {
+ while (!exceptions.isEmpty()) {
+ e.addSuppressed(exceptions.pollFirst());
+ }
+ throw e;
+ }
+ }
+
+
+ /**
+ * Returns the result of the first computation to complete (either successfully or with an exception).
+ */
+ public static T raceResult(Callable f1, Callable f2) throws ExecutionException, InterruptedException {
+ return raceResult(List.of(f1, f2));
+ }
+
+ /**
+ * Returns the result of the first computation to complete (either successfully or with an exception).
+ */
+ public static T raceResult(Callable f1, Callable f2, Callable f3) throws ExecutionException, InterruptedException {
+ return raceResult(List.of(f1, f2, f3));
+ }
+
+ /**
+ * Returns the result of the first computation to complete (either successfully or with an exception).
+ */
+ public static T raceResult(List> fs) throws ExecutionException, InterruptedException {
+ var result = race(fs.stream().>map(f -> () -> {
+ try {
+ return f.call();
+ } catch (Exception e) {
+ return new ExceptionWrapperInRaceResult(e);
+ }
+ }).toList());
+ if (result instanceof ExceptionWrapperInRaceResult ew) {
+ throw new ExecutionException(ew.e);
+ } else {
+ return (T) result;
+ }
+ }
+
+ private record NullWrapperInRace() {}
+
+ private record ExceptionWrapperInRace(Exception e) {}
+
+ private record ExceptionWrapperInRaceResult(Exception e) {}
+
+ private record Timeout() {}
+}
diff --git a/structured/src/main/java/com/softwaremill/jox/structured/Scope.java b/structured/src/main/java/com/softwaremill/jox/structured/Scope.java
new file mode 100644
index 0000000..9128732
--- /dev/null
+++ b/structured/src/main/java/com/softwaremill/jox/structured/Scope.java
@@ -0,0 +1,106 @@
+package com.softwaremill.jox.structured;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.StructuredTaskScope;
+
+/**
+ * Capability granted by an {@link Scopes#supervised(Scoped)} or {@link Scopes#unsupervised(ScopedUnsupervised)}
+ * concurrency scope.
+ *
+ * Represents a capability to fork supervised or unsupervised, asynchronously running computations in a concurrency
+ * scope. Such forks can be created using {@link Scope#fork}, {@link Scope#forkUser},
+ * {@link UnsupervisedScope#forkCancellable} or {@link UnsupervisedScope#forkUnsupervised}.
+ *
+ * @see ScopedUnsupervised
+ */
+public class Scope extends UnsupervisedScope {
+ private final StructuredTaskScope