Skip to content

Commit

Permalink
Make select clauses immutable & reusable (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Jan 4, 2024
1 parent f891773 commit 00712c2
Show file tree
Hide file tree
Showing 6 changed files with 1,517 additions and 1,420 deletions.
62 changes: 31 additions & 31 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public Object sendSafe(T value) throws InterruptedException {

/**
* @return If {@code select} & {@code selectClause} is {@code null}: {@code null} when the value was sent, or
* {@link ChannelClosed}, when the channel is closed. Otherwise, might also return {@link StoredSelect}.
* {@link ChannelClosed}, when the channel is closed. Otherwise, might also return {@link StoredSelectClause}.
*/
private Object doSend(T value, SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
if (value == null) {
Expand Down Expand Up @@ -234,7 +234,7 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
// not cleaning the previous segments - the close procedure might still need it
return closedReason.get();
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// we stored a select instance - there's no matching receive, not clearing the previous segment
return ss;
}
Expand All @@ -248,7 +248,7 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
* @param i The index within the {@code segment}.
* @param s Index of the reserved cell.
* @param value The value to send.
* @return One of {@link SendResult}, or {@link StoredSelect} if {@code select} is not {@code null}.
* @return One of {@link SendResult}, or {@link StoredSelectClause} if {@code select} is not {@code null}.
*/
private Object updateCellSend(Segment segment, int i, long s, T value, SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
Expand All @@ -262,7 +262,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns
if (select != null) {
// cell is empty, no receiver, and we are in a select -> store the select instance
// and await externally; the value to send is stored in the selectClause
var storedSelect = new StoredSelect(select, segment, i, true, selectClause);
var storedSelect = new StoredSelectClause(select, segment, i, true, selectClause, value);
if (segment.casCell(i, state, storedSelect)) {
return storedSelect;
}
Expand Down Expand Up @@ -305,9 +305,14 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns
return SendResult.FAILED;
}
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// Setting the payload first, before the memory barrier created by potentially setting `SelectInstance.state`.
// The state is the read in select's main thread. Since we have this send-cell exclusively, no other thread
// will attempt to call `setPayload`.
ss.setPayload(value);

// a select clause is waiting -> trying to resume
if (ss.getSelect().trySelect(ss, value)) {
if (ss.getSelect().trySelect(ss)) {
segment.setCell(i, DONE);
return SendResult.RESUMED;
} else {
Expand Down Expand Up @@ -357,7 +362,7 @@ public Object receiveSafe() throws InterruptedException {

/**
* @return If {@code select} & {@code selectClause} is {@code null}: the received value, or {@link ChannelClosed},
* when the channel is closed. Otherwise, might also return {@link StoredSelect}.
* when the channel is closed. Otherwise, might also return {@link StoredSelectClause}.
*/
private Object doReceive(SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
Expand Down Expand Up @@ -404,7 +409,7 @@ private Object doReceive(SelectInstance select, SelectClause<?> selectClause) th
- awaiting on the continuation is interrupted, in which case the exception propagates outside of this method
- we stored the given select instance (in an empty / in-buffer cell)
*/
if (!(result instanceof StoredSelect)) {
if (!(result instanceof StoredSelectClause)) {
segment.cleanPrev();
}
if (result != ReceiveResult.FAILED) {
Expand All @@ -424,7 +429,7 @@ private Object doReceive(SelectInstance select, SelectClause<?> selectClause) th
* @param i The index within the {@code segment}.
* @param r Index of the reserved cell.
* @param select The select instance of which this receive is part of, or {@code null} (along with {@code selectClause}) if this is a direct receive call.
* @return Either a state-result ({@link ReceiveResult}), {@link StoredSelect} in case {@code select} is not {@code null}, or the received value.
* @return Either a state-result ({@link ReceiveResult}), {@link StoredSelectClause} in case {@code select} is not {@code null}, or the received value.
*/
private Object updateCellReceive(Segment segment, int i, long r, SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
Expand All @@ -437,7 +442,7 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance
if (select != null) {
// cell is empty, no sender, and we are in a select -> store the select instance
// and await externally
var storedSelect = new StoredSelect(select, segment, i, false, selectClause);
var storedSelect = new StoredSelectClause(select, segment, i, false, selectClause, null);
if (segment.casCell(i, state, storedSelect)) {
expandBuffer();
return storedSelect;
Expand Down Expand Up @@ -485,14 +490,14 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance
}
// else: CAS unsuccessful, repeat
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// resolving a potential race with `expandBuffer`
if (segment.casCell(i, state, RESUMING)) {
// a send clause is waiting -> trying to resume
if (ss.getSelect().trySelect(ss, 0)) {
if (ss.getSelect().trySelect(ss)) {
segment.setCell(i, DONE);
expandBuffer();
return ss.getClause().getPayload();
return ss.getPayload();
} else {
// when select fails (another clause is selected, select is interrupted, closed etc.) -> trying with a new one
// the state will be set to INTERRUPTED_SEND by the cleanup, meanwhile everybody else will observe RESUMING
Expand Down Expand Up @@ -596,11 +601,11 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
// must be a receiver continuation - another buffer expansion already happened
return ExpandBufferResult.DONE;
}
case StoredSelect ss when ss.isSender() -> {
case StoredSelectClause ss when ss.isSender() -> {
if (segment.casCell(i, state, RESUMING)) {
// a send clause is waiting -> trying to resume
if (ss.getSelect().trySelect(ss, 0)) {
segment.setCell(i, new Buffered(ss.getClause().getPayload()));
if (ss.getSelect().trySelect(ss)) {
segment.setCell(i, new Buffered(ss.getPayload()));
return ExpandBufferResult.DONE;
} else {
// select unsuccessful -> trying with a new one
Expand All @@ -610,7 +615,7 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
}
// else: CAS unsuccessful, repeat
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// must be a receiver clause of the select - another buffer expansion already happened
return ExpandBufferResult.DONE;
}
Expand Down Expand Up @@ -815,7 +820,7 @@ private void updateCellClose(Segment segment, int i) {
return;
}
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
ss.getSelect().channelClosed(closedReason.get());
// not setting the state & updating counters, as each non-selected stored select cell will be
// cleaned up, setting an interrupted state (and informing the segment)
Expand Down Expand Up @@ -872,7 +877,7 @@ public SelectClause<T> receiveClause() {
* the current channel, and transform it using the provided {@code callback}.
*/
public <U> SelectClause<U> receiveClause(Function<T, U> callback) {
return new SelectClause<>(null) {
return new SelectClause<>() {
@Override
Channel<?> getChannel() {
return Channel.this;
Expand All @@ -889,9 +894,9 @@ Object register(SelectInstance select) {
}

@Override
U transformedRawValue() {
U transformedRawValue(Object rawValue) {
//noinspection unchecked
return callback.apply((T) getPayload());
return callback.apply((T) rawValue);
}
};
}
Expand All @@ -909,7 +914,7 @@ public SelectClause<Void> sendClause(T value) {
* to the current channel, and return the value of the provided callback as the clause's result.
*/
public <U> SelectClause<U> sendClause(T value, Supplier<U> callback) {
return new SelectClause<>(value) {
return new SelectClause<>() {
@Override
Channel<?> getChannel() {
return Channel.this;
Expand All @@ -928,18 +933,13 @@ Object register(SelectInstance select) {
}

@Override
U transformedRawValue() {
U transformedRawValue(Object rawValue) {
return callback.get();
}

@Override
void setPayload(Object payload) {
// ignoring, the payload is set during creation
}
};
}

void cleanupStoredSelect(Segment segment, int i, boolean isSender) {
void cleanupStoredSelectClause(Segment segment, int i, boolean isSender) {
// We treat the cell as if it was interrupted - the code is same as in `Continuation.await`;
// there's no need to resolve races with `SelectInstance.trySelect`, as cleanup is called either when a clause
// is selected, a channel is closed, or during re-registration. In all cases `trySelect` would fail.
Expand Down Expand Up @@ -1007,8 +1007,8 @@ public String toString() {
case Buffered b -> sb.append("V(").append(b.value()).append(")");
case Continuation c when c.isSender() -> sb.append("WS(").append(c.getPayload()).append(")");
case Continuation c -> sb.append("WR");
case StoredSelect ss when ss.isSender() -> sb.append("SS");
case StoredSelect ss -> sb.append("SR");
case StoredSelectClause ss when ss.isSender() -> sb.append("SS");
case StoredSelectClause ss -> sb.append("SR");
default -> throw new IllegalStateException("Unexpected value: " + state);
}
if (i != Segment.SEGMENT_SIZE - 1) sb.append(",");
Expand Down
Loading

1 comment on commit 00712c2

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.20.

Benchmark suite Current: 00712c2 Previous: 109707a Ratio
com.softwaremill.jox.SelectBenchmark.channel 395.73945917707744 ns/op 222.0338600426866 ns/op 1.78

This comment was automatically generated by workflow using github-action-benchmark.

CC: @adamw

Please sign in to comment.