Skip to content

Commit

Permalink
Add structured concurrency module (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Jul 18, 2024
1 parent 2227ef2 commit 06a6907
Show file tree
Hide file tree
Showing 30 changed files with 2,231 additions and 64 deletions.
304 changes: 284 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# jox
# Jox

[![Ideas, suggestions, problems, questions](https://img.shields.io/badge/Discourse-ask%20question-blue)](https://softwaremill.community/c/open-source/11)
[![CI](https://github.com/softwaremill/jox/workflows/CI/badge.svg)](https://github.com/softwaremill/jox/actions?query=workflow%3A%22CI%22)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.softwaremill.jox/core/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.softwaremill.jox/core)
[![javadoc](https://javadoc.io/badge2/com.softwaremill.jox/core/javadoc.svg)](https://javadoc.io/doc/com.softwaremill.jox/core)

Fast and Scalable Channels in Java. Designed to be used with Java 21+ and virtual threads,
see [Project Loom](https://openjdk.org/projects/loom/) (although the `core` module can be used with Java 17+).
Modern concurrency for Java 21+ (backed by virtual threads, see [Project Loom](https://openjdk.org/projects/loom/)).
Includes:

Inspired by the "Fast and Scalable Channels in Kotlin Coroutines" [paper](https://arxiv.org/abs/2211.04986), and
the [Kotlin implementation](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).
* Fast and Scalable Channels in Java. Inspired by the "Fast and Scalable Channels in Kotlin Coroutines"
[paper](https://arxiv.org/abs/2211.04986), and
the [Kotlin implementation](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).
* Programmer-friendly structured concurrency
* Blocking, synchronous, functional streaming operators

JavaDocs can be browsed
at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.jox/core/latest/com.softwaremill.jox/com/softwaremill/jox/package-summary.html).
Expand All @@ -25,34 +28,38 @@ Videos:
* [A 10-minute introduction to Jox](https://www.youtube.com/watch?v=Ss9b1HpPDt0)
* [Passing control information through channels](https://www.youtube.com/watch?v=VjiCzaiRro8)

## Dependencies
For a Scala version, see the [Ox project](https://github.com/softwaremill/ox).

## Table of contents

* [Channels](#channels)
* [Structured concurrency](#structured-concurrency)
* [Streaming](#streaming)

## Channels

### Dependency

Maven:

```xml

<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>core</artifactId>
<artifactId>channels</artifactId>
<version>0.2.1</version>
</dependency>
```

Gradle:

```groovy
implementation 'com.softwaremill.jox:core:0.2.1'
```

SBT:

```scala
libraryDependencies += "com.softwaremill.jox" % "core" % "0.2.1"
implementation 'com.softwaremill.jox:channels:0.2.1'
```

## Usage
### Usage

### Rendezvous channel
#### Rendezvous channel

```java
import com.softwaremill.jox.Channel;
Expand Down Expand Up @@ -84,7 +91,7 @@ class Demo1 {
}
```

### Buffered channel
#### Buffered channel

```java
import com.softwaremill.jox.Channel;
Expand Down Expand Up @@ -113,7 +120,7 @@ class Demo2 {

Unlimited channels can be created with `Channel.newUnlimitedChannel()`. Such channels will never block on send().

### Closing a channel
#### Closing a channel

Channels can be closed, either because the source is `done` with sending values, or when there's an `error` while
the sink processes the received values.
Expand Down Expand Up @@ -144,7 +151,7 @@ class Demo3 {
}
```

### Selecting from multiple channels
#### Selecting from multiple channels

The `select` method selects exactly one clause to complete. For example, you can receive a value from exactly one
channel:
Expand Down Expand Up @@ -224,7 +231,7 @@ class Demo6 {
}
```

## Performance
### Performance

The project includes benchmarks implemented using JMH - both for the `Channel`, as well as for some built-in Java
synchronisation primitives (queues), as well as the Kotlin channel implementation.
Expand Down Expand Up @@ -314,6 +321,263 @@ ParallelKotlinBenchmark.parallelChannels_defaultDispatcher 16
ChainedKotlinBenchmark.channelChain_defaultDispatcher 16 10000 avgt 20 6.039 ± 0.826 ns/op
```

## Structured concurrency

### Dependency

Maven:

```xml

<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>structured</artifactId>
<version>0.2.1</version>
</dependency>
```

Gradle:

```groovy
implementation 'com.softwaremill.jox:structured:0.2.1'
```

### Usage

#### Creating scopes and forking computations

```java
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Scopes.supervised;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = supervised(scope -> {
var f1 = scope.fork(() -> {
Thread.sleep(500);
return 5;
});
var f2 = scope.fork(() -> {
Thread.sleep(1000);
return 6;
});
return f1.join() + f2.join();
});
System.out.println("result = " + result);
}
}
```

* the `supervised` scope will only complete once any forks started within complete as well
* in other words, it's guaranteed that no forks will remain running, after a `supervised` block completes
* `fork` starts a concurrently running computation, which can be joined in a blocking way. These computatioins are
backed by virtual threads

#### Error handling in scopes

```java
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Scopes.supervised;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = supervised(scope -> {
var f1 = scope.fork(() -> {
Thread.sleep(1000);
return 6;
});
var f2 = scope.<Integer>fork(() -> {
Thread.sleep(500);
throw new RuntimeException("I can’t count to 5!");
});
return f1.join() + f2.join();
});
System.out.println("result = " + result);
}
}
```

* an exception thrown from the scope's body, or from any of the forks, causes the scope to end
* any forks that are still running are then interrupted
* once all forks complete, an `ExecutionException` is thrown by the `supervised` method
* the cause of the `ExecutionException` is the original exception
* any other exceptions (e.g. `InterruptedExceptions`) that have been thrown while ending the scope, are added as
suppressed

Jox implements the "let it crash" model. When an error occurs, the entire scope ends, propagating the exception higher,
so that it can be properly handled. Moreover, no detail is lost: all exceptions are preserved, either as causes, or
suppressed exceptions.

#### Other types of scopes & forks

There are 4 types of forks:

* `fork`: daemon fork, supervised; when the scope's body ends, such forks are interrupted
* `forkUser`: user fork, supervised; when the scope's body ends, the scope's method waits until such a fork completes
normally
* `forkUnsupervised`: daemon fork, unsupervised; any thrown exceptions don't cause the scope to end, but instead can be
discovered when the fork is `.join`ed
* `forkCancellable`: daemon fork, unsupervised, which can be manually cancelled (interrupted)

There are also 2 types of scopes:

* `supervised`: the default scope, which ends when all forks user forks complete successfully, or when there's any
exception in supervised scopes
* `unsupervised`: a scope where only unsupervised forks can be started

#### Running computations in parallel

```java
import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Par.par;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = par(List.of(() -> {
Thread.sleep(500);
return 5;
}, () -> {
Thread.sleep(1000);
return 6;
}));
System.out.println("result = " + result);
}
}
// result = [5, 6]
```

Uses `supervised` scopes underneath.

#### Racing computations

```java
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Race.race;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = race(() -> {
Thread.sleep(1000);
return 10;
}, () -> {
Thread.sleep(500);
return 5;
});
// result will be 5, the other computation will be interrupted on the Thread.sleep
System.out.println("result = " + result);
}
}
// result = 5
```

#### Timing out a computation

```java
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static com.softwaremill.jox.structured.Race.timeout;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
var result = timeout(1000, () -> {
Thread.sleep(500);
return 5;
});
System.out.println("result = " + result);
}
}
// result = 5
```

## Streaming

### Dependency

Maven:

```xml

<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>channel-ops</artifactId>
<version>0.2.1</version>
</dependency>
```

Gradle:

```groovy
implementation 'com.softwaremill.jox:channel-ops:0.2.1'
```

### Usage

Using this module you can run operations on streams which require starting background threads. To do that,
you need to pass an active concurrency scope (started using `supervised`) to the `SourceOps` constructor.

Each method from `SourceOps` causes a new fork (virtual thread) to be started, which starts running its logic
immediately (producing elements / consuming and transforming elements from the given source). Thus, this is an
implementation of "hot streams".

#### Creating streams

Sources from iterables, or tick-sources, can be created by calling methods on `SourceOps`:

```java
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Scopes.supervised;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
supervised(scope -> {
new SourceOps(scope)
.tick(500, "tick")
.toSource()
.forEach(v -> System.out.println(v));
return null; // unreachable, as `tick` produces infinitely many elements
});
}
}
```

A tick-source can also be used in the usual way, by calling `.receive` on it, or by using it in `select`'s clauses.

#### Transforming streams

Streams can be transformed by calling the appropriate methods on the object returned by
`SourceOps.forSource(scope, source)`.

`collect` combines the functionality of `map` and `filter`: elements are mapped, and when the mapping function returns
`null`, the element is skipped:

```java
import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Scopes.supervised;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = supervised(scope -> new SourceOps(scope)
.fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.collect(n -> {
if (n % 2 == 0) return null;
else return n * 10;
})
.toSource().toList());
System.out.println("result = " + result);
}
}
// result = [10, 30, 50, 70, 90]
```

## Feedback

Is what we are looking for!
Expand Down
10 changes: 0 additions & 10 deletions bench/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,6 @@
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
Loading

0 comments on commit 06a6907

Please sign in to comment.