Skip to content

Commit

Permalink
Readme
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Feb 4, 2025
1 parent afd9b52 commit c53e153
Showing 1 changed file with 20 additions and 25 deletions.
45 changes: 20 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
[![javadoc](https://javadoc.io/badge2/com.softwaremill.jox/channels/javadoc.svg)](https://javadoc.io/doc/com.softwaremill.jox/channels)

Modern concurrency for Java 21 (backed by virtual threads, see [Project Loom](https://openjdk.org/projects/loom/)).
Requires JDK 21.
Includes:

* Fast and Scalable Channels in Java. Inspired by the "Fast and Scalable Channels in Kotlin Coroutines"
[paper](https://arxiv.org/abs/2211.04986), and
the [Kotlin implementation](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).
* Programmer-friendly structured concurrency
* Blocking, synchronous, functional streaming operators
* Blocking, synchronous, functional streaming

JavaDocs can be browsed
at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.jox/core/latest/com.softwaremill.jox/com/softwaremill/jox/package-summary.html).
Expand All @@ -35,7 +34,6 @@ For a Scala version, see the [Ox project](https://github.com/softwaremill/ox).

* [Channels](#channels)
* [Structured concurrency](#structured-concurrency)
* [Streaming](#streaming)
* [Flows](#lazy-streaming---flows)

## Channels
Expand Down Expand Up @@ -343,6 +341,8 @@ ChainedKotlinBenchmark.channelChain_defaultDispatcher 16

## Structured concurrency

Requires the current LTS release of Java - JDK 21 (won't work with newer versions).

### Dependency

Maven:
Expand Down Expand Up @@ -431,17 +431,14 @@ so that it can be properly handled. Moreover, no detail is lost: all exceptions
suppressed exceptions.

As `JoxScopeExecutionException` is unchecked, we introduced utility method called
`JoxScopeExecutionException#unwrapAndThrow`.
If the wrapped exception is instance of any of passed classes, this method unwraps original exception and throws it as
checked exception, `throws` signature forces exception handling.
If the wrapped exception is not instance of any of passed classes, **nothing happens**.
All suppressed exceptions are rewritten from `JoxScopeExecutionException`
`JoxScopeExecutionException#unwrapAndThrow`. If the wrapped exception is an instance of any of the passed classes, this
method unwraps original exception and throws it as checked exception; then the `throws` signature forces exception
handling. If the wrapped exception is not instance of any of the passed classes, **nothing happens**. All suppressed
exceptions from `JoxScopeExecutionException` are added as suppressed to the unwrapped one.

**Note** `throws` signature points to the closest super class of passed arguments.
Method does **not** rethrow `JoxScopeExecutionException` by default.
So it is advised to manually rethrow it after calling `unwrapAndThrow` method.

e.g.
So it is advised to manually rethrow it after calling `unwrapAndThrow` method, e.g.:

```java
import com.softwaremill.jox.structured.JoxScopeExecutionException;
Expand Down Expand Up @@ -651,10 +648,10 @@ The `FlowEmit` instance is used to emit elements by the flow, that is process th
pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's
a processing error.

As part of the callback, you can create `Scope`, fork background computations or run other flows asynchronously.
As part of the callback, you can create a `Scope`, fork background computations or run other flows asynchronously.
However, take care **not** to share the `FlowEmit` instance across threads. That is, instances of `FlowEmit` are
thread-unsafe and should only be used on the calling thread.
The lifetime of `FlowEmit` should not extend over the duration of the invocation of `usingEmit`.
thread-unsafe and should only be used on the calling thread. The lifetime of `FlowEmit` should not extend over the
duration of the invocation of `usingEmit`.

Any asynchronous communication should be best done with `Channel`s. You can then manually forward any elements received
from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.
Expand Down Expand Up @@ -727,18 +724,17 @@ Processes the elements one-by-one on the thread that is invoking the run method.
### Transforming flows: concurrency

A number of flow transformations introduces asynchronous boundaries. For example,
`.mapPar(int parallelism, Function<T,U> mappingFunction)` describes a flow,
which runs the pipeline defined so far in the background, emitting elements to a `channel`. Another `fork` reads these
elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by
the returned flow.
`.mapPar(int parallelism, Function<T,U> mappingFunction)` describes a flow, which runs the pipeline defined so far in
the background, emitting elements to a `channel`. Another `fork` reads these elements and runs up to `parallelism`
invocations of `mappingFunction` concurrently. Mapped elements are then emitted by the returned flow.

Behind the scenes, a new concurrency `Scope` is created along with a number of forks. In case of any exceptions,
everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from
the preceding pipeline are propagated through the channel.

Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and `I/O` stages. The
created channels serve as buffers between the pipeline stages, and their capacity is defined by the `ScopedValue`
`Channel.BUFFER_SIZE` in the scope, or default `Channel.DEFAULT_BUFFER_SIZE` is used.
`Flow.CHANNEL_BUFFER_SIZE` in the scope, or default `Channel.DEFAULT_BUFFER_SIZE` is used.

Explicit asynchronous boundaries can be inserted using `.buffer()`. This might be useful if producing the next element
to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the
Expand Down Expand Up @@ -776,11 +772,10 @@ the pipeline described by the flow, and emits its elements onto the returned cha
### Text transformations and I/O operations

For smooth operations on `byte[]`, we've created a wrapper class `ByteChunk`. And for smooth type handling we created a
dedicated `ByteFlow`, a subtype of `Flow<ByteChunk>`.
To be able to utilize text and I/O operations, you need to create or transform into `ByteFlow`. It can be created via
`Flows.fromByteArray` or `Flows.fromByteChunk`.
`Flow` containing `byte[]` or `ByteChunk` can be transformed by using `toByteFlow()` method. Any other flow can be
transformed by using `toByteFlow()` with mapping function.
dedicated `ByteFlow`, a subtype of `Flow<ByteChunk>`. To be able to utilize text and I/O operations, you need to create
or transform into `ByteFlow`. It can be created via `Flows.fromByteArray` or `Flows.fromByteChunk`. `Flow` containing
`byte[]` or `ByteChunk` can be transformed by using `toByteFlow()` method. Any other flow can be transformed by using
`toByteFlow()` with mapping function.

#### Text operations

Expand Down Expand Up @@ -858,4 +853,4 @@ We offer commercial development services. [Contact us](https://softwaremill.com)

## Copyright

Copyright (C) 2023-2024 SoftwareMill [https://softwaremill.com](https://softwaremill.com).
Copyright (C) 2023-2025 SoftwareMill [https://softwaremill.com](https://softwaremill.com).

0 comments on commit c53e153

Please sign in to comment.