Skip to content

Commit

Permalink
Receive selects
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Dec 29, 2023
1 parent 853479e commit 0e8f391
Show file tree
Hide file tree
Showing 4 changed files with 601 additions and 37 deletions.
214 changes: 177 additions & 37 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.softwaremill.jox.CellState.*;
Expand Down Expand Up @@ -121,7 +122,9 @@ public Channel() {
* Capacity cannot be negative.
*/
public Channel(int capacity) {
assert capacity >= 0 : "Capacity must be non-negative.";
if (capacity < 0) {
throw new IllegalArgumentException("Capacity must be non-negative.");
}

this.capacity = capacity;
isRendezvous = capacity == 0L;
Expand Down Expand Up @@ -277,7 +280,18 @@ private SendResult updateCellSend(Segment segment, int i, long s, T value) throw
segment.setCell(i, DONE);
return SendResult.RESUMED;
} else {
// cell interrupted -> trying with a new one
// when cell interrupted -> trying with a new one
// when close in progress -> subsequent cells are already closed, this will be detected in the next iteration
return SendResult.FAILED;
}
}
case StoredSelect ss -> {
// a select clause is waiting -> trying to resume
if (ss.getSelect().trySelect(ss, value)) {
segment.setCell(i, DONE);
return SendResult.RESUMED;
} else {
// select unsuccessful -> trying with a new one
return SendResult.FAILED;
}
}
Expand Down Expand Up @@ -318,6 +332,14 @@ public T receive() throws InterruptedException {
* @return Either a value of type {@code T}, or {@link ChannelClosed}, when the channel is closed.
*/
public Object receiveSafe() throws InterruptedException {
return doReceive(null, null);
}

/**
* @return If {@code select} & {@code selectClause} is {@code null}: the received value, or {@link ChannelClosed},
* when the channel is closed. Otherwise, might also return {@link StoredSelect}.
*/
private Object doReceive(SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
// reading the segment before the counter increment - this is needed to find the required segment later
var segment = receiveSegment.get();
Expand All @@ -344,23 +366,27 @@ public Object receiveSafe() throws InterruptedException {
}
}

var result = updateCellReceive(segment, i, r);
var result = updateCellReceive(segment, i, r, select, selectClause);
if (result == ReceiveResult.CLOSED) {
// not cleaning the previous segments - the close procedure might still need it
return closedReason.get();
} else {
/*
After `updateCellReceive` completes and the channel isn't closed, we can be sure that S > r:
After `updateCellReceive` completes and the channel isn't closed, we can be sure that S > r, unless
we stored the given select instance:
- if we stored and awaited a continuation, and it was resumed, then a sender must have appeared
- if we marked the cell as broken, then a sender is in progress in that cell
- if a continuation was present, then the sender must have been there
- if the cell was interrupted, that could have been only because of a sender
- if a value was buffered, that's because there was/is a matching sender
The only case when S < r is when awaiting on the continuation is interrupted, in which case the
exception propagates outside of this method.
The only cases when S <= r are when:
- awaiting on the continuation is interrupted, in which case the exception propagates outside of this method
- we stored the given select instance (in an empty / in-buffer cell)
*/
segment.cleanPrev();
if (!(result instanceof StoredSelect)) {
segment.cleanPrev();
}
if (result != ReceiveResult.FAILED) {
return result;
}
Expand All @@ -371,33 +397,47 @@ public Object receiveSafe() throws InterruptedException {
/**
* Invariant maintained by receive + expandBuffer: between R and B the number of cells that are empty / IN_BUFFER should be equal
* to the buffer size. These are the cells that can accept a sender without suspension.
* <p>
* This method might suspend (and be interrupted) only if {@code select} is {@code null}.
*
* @param segment The segment which stores the cell's state.
* @param i The index within the {@code segment}.
* @param r Index of the reserved cell.
* @return Either a state-result ({@link ReceiveResult}), or the received value.
* @param select The select instance of which this receive is part of, or {@code null} (along with {@code selectClause}) if this is a direct receive call.
* @return Either a state-result ({@link ReceiveResult}), {@link StoredSelect} in case {@code select} is not {@code null}, or the received value.
*/
private Object updateCellReceive(Segment segment, int i, long r) throws InterruptedException {
private Object updateCellReceive(Segment segment, int i, long r, SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
var state = segment.getCell(i); // reading the current state of the cell; we'll try to update it atomically
var switchState = state == null ? IN_BUFFER : state; // we can't combine null+IN_BUFFER in the switch statement, hence cheating a bit here

switch (switchState) {
case IN_BUFFER -> { // means that state == null || state == IN_BUFFER
if (r >= getSendersCounter(sendersAndClosedFlag.get())) { // reading the sender's counter
// cell is empty, and no sender -> suspend
// not using any payload
var c = new Continuation(null);
if (segment.casCell(i, state, c)) {
expandBuffer();
var result = c.await(segment, i);
if (result == ChannelClosedMarker.CLOSED) {
return ReceiveResult.CLOSED;
} else {
return result;
if (select != null) {
// cell is empty, no sender, and we are in a select -> store the select instance
// and await externally
var storedSelect = new StoredSelect(select, segment, i, false, selectClause);
if (segment.casCell(i, state, storedSelect)) {
expandBuffer();
return storedSelect;
}
// else: CAS unsuccessful, repeat
} else {
// cell is empty, and no sender -> suspend
// not using any payload
var c = new Continuation(null);
if (segment.casCell(i, state, c)) {
expandBuffer();
var result = c.await(segment, i);
if (result == ChannelClosedMarker.CLOSED) {
return ReceiveResult.CLOSED;
} else {
return result;
}
}
// else: CAS unsuccessful, repeat
}
// else: CAS unsuccessful, repeat
} else {
// sender in progress, receiver changed state first -> restart
if (segment.casCell(i, state, BROKEN)) {
Expand All @@ -408,20 +448,24 @@ private Object updateCellReceive(Segment segment, int i, long r) throws Interrup
}
}
case Continuation c -> {
// resolving a potential race with `expandBuffer`
if (segment.casCell(i, state, RESUMING)) {
// a sender is waiting -> trying to resume
if (c.tryResume(0)) {
segment.setCell(i, DONE);
expandBuffer();
return c.getPayload();
} else {
// cell interrupted -> trying with a new one
// when cell interrupted -> trying with a new one
// the state will be set to INTERRUPTED_SEND by the continuation, meanwhile everybody else will observe RESUMING
// when close in progress -> the cell state will be updated to CLOSED, subsequent cells are already closed,
// which will be detected in the next iteration
return ReceiveResult.FAILED;
}
}
// else: CAS unsuccessful, repeat
}
// TODO: StoredSelect
case Buffered b -> {
segment.setCell(i, DONE);
expandBuffer();
Expand Down Expand Up @@ -487,7 +531,7 @@ private void expandBuffer() {
// the cell is already counted as processed by the close procedure
return;
}
// else, the cell must have been an interrupted sender; `Continuation` the properly notifies the segment
// else, the cell must have been an interrupted sender; `Continuation` properly notifies the segment
}
}

Expand All @@ -503,8 +547,10 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
segment.setCell(i, new Buffered(c.getPayload()));
return ExpandBufferResult.DONE;
} else {
// cell interrupted -> trying with a new one
// when cell interrupted -> trying with a new one
// the state will be set to INTERRUPTED_SEND by the continuation, meanwhile everybody else will observe RESUMING
// when close in progress -> the cell state will be updated to CLOSED, subsequent cells are already closed,
// which will be detected in the next iteration
return ExpandBufferResult.FAILED;
}
}
Expand All @@ -514,6 +560,26 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
// must be a receiver continuation - another buffer expansion already happened
return ExpandBufferResult.DONE;
}
case StoredSelect ss when ss.isSender() -> {
// TODO
throw new UnsupportedOperationException();
// if (segment.casCell(i, state, RESUMING)) {
// // a sender is waiting -> trying to resume
// if (ss.getSelect().trySelect(ss,0)) {
// segment.setCell(i, new Buffered(c.getPayload()));
// return ExpandBufferResult.DONE;
// } else {
// // select unsuccessful -> trying with a new one
// // the state will be set to INTERRUPTED_SEND by the continuation, meanwhile everybody else will observe RESUMING
// return ExpandBufferResult.FAILED;
// }
// }
// // else: CAS unsuccessful, repeat
}
case StoredSelect ss -> {
// must be a receiver clause of the select - another buffer expansion already happened
return ExpandBufferResult.DONE;
}
case Buffered b -> {
// an element is already buffered; if the ordering of operations was different, we would put IN_BUFFER in that cell and finish
return ExpandBufferResult.DONE;
Expand Down Expand Up @@ -697,17 +763,30 @@ private void updateCellClose(Segment segment, int i) {
}
}
case Continuation c -> {
if (segment.casCell(i, state, RESUMING)) {
if (c.tryResume(ChannelClosedMarker.CLOSED)) {
segment.setCell(i, CLOSED);
segment.cellInterruptedSender_orClosed();
return;
} else {
// cell interrupted - the segment counters will be appropriately decremented from the
// continuation, depending if this is a sender or receiver; moreover, the cell is already
// processed in case this is a receiver
return;
}
// potential race wih sender/receiver resuming the continuation - resolved by synchronizing on
// `Continuation.data`: only one thread will successfully change its value from `null`
if (c.tryResume(ChannelClosedMarker.CLOSED)) {
segment.setCell(i, CLOSED);
segment.cellInterruptedSender_orClosed();
return;
} else {
// when cell interrupted - the segment counters will be appropriately decremented from the
// continuation, depending on if this is a sender or receiver; moreover, the cell is already
// processed in case this is a receiver
// otherwise, the cell might be completed with a value, and the result is processed normally
// on another thread
return;
}
}
case StoredSelect ss -> {
if (ss.getSelect().channelClosed(closedReason.get())) {
segment.setCell(i, CLOSED);
segment.cellInterruptedSender_orClosed();
return;
} else {
// the select hasn't been closed; instead, it will clean up all cells as part of handling the
// non-closed state in its main loop
return;
}
}
case DONE, BROKEN -> {
Expand Down Expand Up @@ -742,6 +821,63 @@ public Throwable isError() {
}
}

// **************
// Select clauses
// **************

private static final Function<Object, Object> IDENTITY = Function.identity();

public SelectClause<T> receiveClause() {
//noinspection unchecked
return receiveClauseMap((Function<T, T>) IDENTITY);
}

public <U> SelectClause<U> receiveClauseMap(Function<T, U> callback) {
return new SelectClause<>() {
@Override
Channel<?> getChannel() {
return Channel.this;
}

@Override
Object register(SelectInstance select) {
try {
return doReceive(select, this);
} catch (InterruptedException e) {
// not possible, as we provide a select, so no suspension should happen
throw new IllegalStateException(e);
}
}

@Override
U transformedRawValue() {
//noinspection unchecked
return callback.apply((T) rawValue);
}
};
}
//
// public SelectClause<Void> sendClause(T value) {
// return sendClauseMap(value, () -> null);
// }
//
// public <U> SelectClause<U> sendClauseMap(T value, Supplier<U> callback) {}

void disposeStoredSelect(Segment segment, int i, boolean isSender) {
// We treat the cell as if it was interrupted - the code is same as in `Continuation.await`;
// there's no need to resolve races with `SelectInstance.trySelect`, as disposal is called either when a clause
// is selected, a channel is closed, or during re-registration. In all cases `trySelect` would fail.
// In other words, the races are resolved by synchronizing on `SelectInstance.state`.
segment.setCell(i, isSender ? INTERRUPTED_SEND : INTERRUPTED_RECEIVE);

// notifying the segment - if all cells become interrupted, the segment can be removed
if (isSender) {
segment.cellInterruptedSender_orClosed();
} else {
segment.cellInterruptedReceiver();
}
}

// ****
// Misc
// ****
Expand Down Expand Up @@ -795,6 +931,8 @@ public String toString() {
case Buffered b -> sb.append("V(").append(b.value()).append(")");
case Continuation c when c.isSender() -> sb.append("WS(").append(c.getPayload()).append(")");
case Continuation c -> sb.append("WR");
case StoredSelect ss when ss.isSender() -> sb.append("SS");
case StoredSelect ss -> sb.append("SR");
default -> throw new IllegalStateException("Unexpected value: " + state);
}
if (i != Segment.SEGMENT_SIZE - 1) sb.append(",");
Expand All @@ -818,13 +956,15 @@ enum SendResult {
}

/**
* Possible return values of {@code Channel#updateCellReceive}: one of the enum constants below, or the received value.
* Possible return values of {@code Channel#updateCellReceive}: one of the enum constants below, the received value or {@link SelectStored} (used for disposal).
*/
enum ReceiveResult {
FAILED,
CLOSED
}

record SelectStored(Segment segment, int i) {}

/**
* Possible return values of {@code Channel#expandBuffer}: one of the enum constants below, or the received value.
*/
Expand All @@ -834,7 +974,7 @@ enum ExpandBufferResult {
CLOSED
}

// possible states of a cell: one of the enum constants below, Buffered, or Continuation
// possible states of a cell: one of the enum constants below, Buffered, Continuation or SelectInstance

enum CellState {
DONE,
Expand All @@ -853,7 +993,7 @@ final class Continuation {
* The number of busy-looping iterations before yielding, during {@link Continuation#await(Segment, int)}.
* {@code 0}, if there's a single CPU.
*/
private static final int SPINS = Runtime.getRuntime().availableProcessors() == 1 ? 0 : 10000;
static final int SPINS = Runtime.getRuntime().availableProcessors() == 1 ? 0 : 10000;

private final Thread creatingThread;
private volatile Object data; // set using DATA var handle
Expand Down
Loading

0 comments on commit 0e8f391

Please sign in to comment.