diff --git a/core/src/main/java/com/softwaremill/jox/Channel.java b/core/src/main/java/com/softwaremill/jox/Channel.java
index 956f6d4..fd4eb52 100644
--- a/core/src/main/java/com/softwaremill/jox/Channel.java
+++ b/core/src/main/java/com/softwaremill/jox/Channel.java
@@ -6,6 +6,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
+import java.util.function.Function;
import java.util.stream.Stream;
import static com.softwaremill.jox.CellState.*;
@@ -121,7 +122,9 @@ public Channel() {
* Capacity cannot be negative.
*/
public Channel(int capacity) {
- assert capacity >= 0 : "Capacity must be non-negative.";
+ if (capacity < 0) {
+ throw new IllegalArgumentException("Capacity must be non-negative.");
+ }
this.capacity = capacity;
isRendezvous = capacity == 0L;
@@ -277,7 +280,18 @@ private SendResult updateCellSend(Segment segment, int i, long s, T value) throw
segment.setCell(i, DONE);
return SendResult.RESUMED;
} else {
- // cell interrupted -> trying with a new one
+ // when cell interrupted -> trying with a new one
+ // when close in progress -> subsequent cells are already closed, this will be detected in the next iteration
+ return SendResult.FAILED;
+ }
+ }
+ case StoredSelect ss -> {
+ // a select clause is waiting -> trying to resume
+ if (ss.getSelect().trySelect(ss, value)) {
+ segment.setCell(i, DONE);
+ return SendResult.RESUMED;
+ } else {
+ // select unsuccessful -> trying with a new one
return SendResult.FAILED;
}
}
@@ -318,6 +332,14 @@ public T receive() throws InterruptedException {
* @return Either a value of type {@code T}, or {@link ChannelClosed}, when the channel is closed.
*/
public Object receiveSafe() throws InterruptedException {
+ return doReceive(null, null);
+ }
+
+ /**
+ * @return If {@code select} & {@code selectClause} is {@code null}: the received value, or {@link ChannelClosed},
+ * when the channel is closed. Otherwise, might also return {@link StoredSelect}.
+ */
+ private Object doReceive(SelectInstance select, SelectClause> selectClause) throws InterruptedException {
while (true) {
// reading the segment before the counter increment - this is needed to find the required segment later
var segment = receiveSegment.get();
@@ -344,23 +366,27 @@ public Object receiveSafe() throws InterruptedException {
}
}
- var result = updateCellReceive(segment, i, r);
+ var result = updateCellReceive(segment, i, r, select, selectClause);
if (result == ReceiveResult.CLOSED) {
// not cleaning the previous segments - the close procedure might still need it
return closedReason.get();
} else {
/*
- After `updateCellReceive` completes and the channel isn't closed, we can be sure that S > r:
+ After `updateCellReceive` completes and the channel isn't closed, we can be sure that S > r, unless
+ we stored the given select instance:
- if we stored and awaited a continuation, and it was resumed, then a sender must have appeared
- if we marked the cell as broken, then a sender is in progress in that cell
- if a continuation was present, then the sender must have been there
- if the cell was interrupted, that could have been only because of a sender
- if a value was buffered, that's because there was/is a matching sender
- The only case when S < r is when awaiting on the continuation is interrupted, in which case the
- exception propagates outside of this method.
+ The only cases when S <= r are when:
+ - awaiting on the continuation is interrupted, in which case the exception propagates outside of this method
+ - we stored the given select instance (in an empty / in-buffer cell)
*/
- segment.cleanPrev();
+ if (!(result instanceof StoredSelect)) {
+ segment.cleanPrev();
+ }
if (result != ReceiveResult.FAILED) {
return result;
}
@@ -371,13 +397,16 @@ public Object receiveSafe() throws InterruptedException {
/**
* Invariant maintained by receive + expandBuffer: between R and B the number of cells that are empty / IN_BUFFER should be equal
* to the buffer size. These are the cells that can accept a sender without suspension.
+ *
+ * This method might suspend (and be interrupted) only if {@code select} is {@code null}.
*
* @param segment The segment which stores the cell's state.
* @param i The index within the {@code segment}.
* @param r Index of the reserved cell.
- * @return Either a state-result ({@link ReceiveResult}), or the received value.
+ * @param select The select instance of which this receive is part of, or {@code null} (along with {@code selectClause}) if this is a direct receive call.
+ * @return Either a state-result ({@link ReceiveResult}), {@link StoredSelect} in case {@code select} is not {@code null}, or the received value.
*/
- private Object updateCellReceive(Segment segment, int i, long r) throws InterruptedException {
+ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance select, SelectClause> selectClause) throws InterruptedException {
while (true) {
var state = segment.getCell(i); // reading the current state of the cell; we'll try to update it atomically
var switchState = state == null ? IN_BUFFER : state; // we can't combine null+IN_BUFFER in the switch statement, hence cheating a bit here
@@ -385,19 +414,30 @@ private Object updateCellReceive(Segment segment, int i, long r) throws Interrup
switch (switchState) {
case IN_BUFFER -> { // means that state == null || state == IN_BUFFER
if (r >= getSendersCounter(sendersAndClosedFlag.get())) { // reading the sender's counter
- // cell is empty, and no sender -> suspend
- // not using any payload
- var c = new Continuation(null);
- if (segment.casCell(i, state, c)) {
- expandBuffer();
- var result = c.await(segment, i);
- if (result == ChannelClosedMarker.CLOSED) {
- return ReceiveResult.CLOSED;
- } else {
- return result;
+ if (select != null) {
+ // cell is empty, no sender, and we are in a select -> store the select instance
+ // and await externally
+ var storedSelect = new StoredSelect(select, segment, i, false, selectClause);
+ if (segment.casCell(i, state, storedSelect)) {
+ expandBuffer();
+ return storedSelect;
}
+ // else: CAS unsuccessful, repeat
+ } else {
+ // cell is empty, and no sender -> suspend
+ // not using any payload
+ var c = new Continuation(null);
+ if (segment.casCell(i, state, c)) {
+ expandBuffer();
+ var result = c.await(segment, i);
+ if (result == ChannelClosedMarker.CLOSED) {
+ return ReceiveResult.CLOSED;
+ } else {
+ return result;
+ }
+ }
+ // else: CAS unsuccessful, repeat
}
- // else: CAS unsuccessful, repeat
} else {
// sender in progress, receiver changed state first -> restart
if (segment.casCell(i, state, BROKEN)) {
@@ -408,6 +448,7 @@ private Object updateCellReceive(Segment segment, int i, long r) throws Interrup
}
}
case Continuation c -> {
+ // resolving a potential race with `expandBuffer`
if (segment.casCell(i, state, RESUMING)) {
// a sender is waiting -> trying to resume
if (c.tryResume(0)) {
@@ -415,13 +456,16 @@ private Object updateCellReceive(Segment segment, int i, long r) throws Interrup
expandBuffer();
return c.getPayload();
} else {
- // cell interrupted -> trying with a new one
+ // when cell interrupted -> trying with a new one
// the state will be set to INTERRUPTED_SEND by the continuation, meanwhile everybody else will observe RESUMING
+ // when close in progress -> the cell state will be updated to CLOSED, subsequent cells are already closed,
+ // which will be detected in the next iteration
return ReceiveResult.FAILED;
}
}
// else: CAS unsuccessful, repeat
}
+ // TODO: StoredSelect
case Buffered b -> {
segment.setCell(i, DONE);
expandBuffer();
@@ -487,7 +531,7 @@ private void expandBuffer() {
// the cell is already counted as processed by the close procedure
return;
}
- // else, the cell must have been an interrupted sender; `Continuation` the properly notifies the segment
+ // else, the cell must have been an interrupted sender; `Continuation` properly notifies the segment
}
}
@@ -503,8 +547,10 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
segment.setCell(i, new Buffered(c.getPayload()));
return ExpandBufferResult.DONE;
} else {
- // cell interrupted -> trying with a new one
+ // when cell interrupted -> trying with a new one
// the state will be set to INTERRUPTED_SEND by the continuation, meanwhile everybody else will observe RESUMING
+ // when close in progress -> the cell state will be updated to CLOSED, subsequent cells are already closed,
+ // which will be detected in the next iteration
return ExpandBufferResult.FAILED;
}
}
@@ -514,6 +560,26 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
// must be a receiver continuation - another buffer expansion already happened
return ExpandBufferResult.DONE;
}
+ case StoredSelect ss when ss.isSender() -> {
+ // TODO
+ throw new UnsupportedOperationException();
+// if (segment.casCell(i, state, RESUMING)) {
+// // a sender is waiting -> trying to resume
+// if (ss.getSelect().trySelect(ss,0)) {
+// segment.setCell(i, new Buffered(c.getPayload()));
+// return ExpandBufferResult.DONE;
+// } else {
+// // select unsuccessful -> trying with a new one
+// // the state will be set to INTERRUPTED_SEND by the continuation, meanwhile everybody else will observe RESUMING
+// return ExpandBufferResult.FAILED;
+// }
+// }
+// // else: CAS unsuccessful, repeat
+ }
+ case StoredSelect ss -> {
+ // must be a receiver clause of the select - another buffer expansion already happened
+ return ExpandBufferResult.DONE;
+ }
case Buffered b -> {
// an element is already buffered; if the ordering of operations was different, we would put IN_BUFFER in that cell and finish
return ExpandBufferResult.DONE;
@@ -697,17 +763,30 @@ private void updateCellClose(Segment segment, int i) {
}
}
case Continuation c -> {
- if (segment.casCell(i, state, RESUMING)) {
- if (c.tryResume(ChannelClosedMarker.CLOSED)) {
- segment.setCell(i, CLOSED);
- segment.cellInterruptedSender_orClosed();
- return;
- } else {
- // cell interrupted - the segment counters will be appropriately decremented from the
- // continuation, depending if this is a sender or receiver; moreover, the cell is already
- // processed in case this is a receiver
- return;
- }
+ // potential race wih sender/receiver resuming the continuation - resolved by synchronizing on
+ // `Continuation.data`: only one thread will successfully change its value from `null`
+ if (c.tryResume(ChannelClosedMarker.CLOSED)) {
+ segment.setCell(i, CLOSED);
+ segment.cellInterruptedSender_orClosed();
+ return;
+ } else {
+ // when cell interrupted - the segment counters will be appropriately decremented from the
+ // continuation, depending on if this is a sender or receiver; moreover, the cell is already
+ // processed in case this is a receiver
+ // otherwise, the cell might be completed with a value, and the result is processed normally
+ // on another thread
+ return;
+ }
+ }
+ case StoredSelect ss -> {
+ if (ss.getSelect().channelClosed(closedReason.get())) {
+ segment.setCell(i, CLOSED);
+ segment.cellInterruptedSender_orClosed();
+ return;
+ } else {
+ // the select hasn't been closed; instead, it will clean up all cells as part of handling the
+ // non-closed state in its main loop
+ return;
}
}
case DONE, BROKEN -> {
@@ -742,6 +821,63 @@ public Throwable isError() {
}
}
+ // **************
+ // Select clauses
+ // **************
+
+ private static final Function