Skip to content

Commit

Permalink
Updated the FlowEngine so nodes that have to be updated in the curren…
Browse files Browse the repository at this point in the history
…t cycle cannot be scheduled twice. (#284)

Updated the FlowNodeQueue
  • Loading branch information
DanteNiewenhuis authored Dec 17, 2024
1 parent f55aaed commit c425a03
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ public void removeConsumerEdge(FlowEdge consumerEdge) {
*/
@Override
public void removeSupplierEdge(FlowEdge supplierEdge) {
if (this.machineEdge == null) {
return;
}

this.stopWorkload();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public long onUpdate(long now) {

@Override
public void stopWorkload() {
if (this.machineEdge == null) {
return;
}

this.closeNode();

this.machineEdge = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,32 @@

/**
* A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s
* that have been updated during the engine cycle and should converge.
* that should be updated in the current cycle, because of a change caused by another update in the current cycle.
* <p>
* By using a specialized class, we reduce the overhead caused by type-erasure.
*/
final class FlowNodeQueue {
final class FlowCycleQueue {
/**
* The array of elements in the queue.
*/
private FlowNode[] elements;
private FlowNode[] nodeQueue;

private int head = 0;
private int tail = 0;

public FlowNodeQueue(int initialCapacity) {
elements = new FlowNode[initialCapacity];
public FlowCycleQueue(int initialCapacity) {
nodeQueue = new FlowNode[initialCapacity];
}

/**
* Add the specified context to the queue.
*/
void add(FlowNode ctx) {
final FlowNode[] es = elements;
if (ctx.getInCycleQueue()) {
return;
}

final FlowNode[] es = nodeQueue;
int tail = this.tail;

es[tail] = ctx;
Expand All @@ -60,19 +64,22 @@ void add(FlowNode ctx) {
if (head == tail) {
doubleCapacity();
}

ctx.setInCycleQueue(true);
}

/**
* Remove a {@link FlowNode} from the queue or <code>null</code> if the queue is empty.
*/
FlowNode poll() {
final FlowNode[] es = elements;
final FlowNode[] es = nodeQueue;
int head = this.head;
FlowNode ctx = es[head];

if (ctx != null) {
es[head] = null;
this.head = inc(head, es.length);
ctx.setInCycleQueue(false);
}

return ctx;
Expand All @@ -82,13 +89,13 @@ FlowNode poll() {
* Doubles the capacity of this deque
*/
private void doubleCapacity() {
int oldCapacity = elements.length;
int oldCapacity = nodeQueue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) {
throw new IllegalStateException("Sorry, deque too big");
}

final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity);
final FlowNode[] es = nodeQueue = Arrays.copyOf(nodeQueue, newCapacity);

// Exceptionally, here tail == head needs to be disambiguated
if (tail < head || (tail == head && es[head] != null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
*/
public final class FlowEngine implements Runnable {
/**
* The queue of {@link FlowNode} updates that are scheduled for immediate execution.
* The queue of {@link FlowNode} updates that need to be updated in the current cycle.
*/
private final FlowNodeQueue queue = new FlowNodeQueue(256);
private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256);

/**
* A priority queue containing the {@link FlowNode} updates to be scheduled in the future.
Expand Down Expand Up @@ -112,7 +112,7 @@ public void scheduleImmediate(long now, FlowNode ctx) {
* This method should only be invoked while inside an engine cycle.
*/
public void scheduleImmediateInContext(FlowNode ctx) {
queue.add(ctx);
cycleQueue.add(ctx);
}

/**
Expand Down Expand Up @@ -147,7 +147,7 @@ public void scheduleDelayedInContext(FlowNode ctx) {
* Run all the enqueued actions for the specified timestamp (<code>now</code>).
*/
private void doRunEngine(long now) {
final FlowNodeQueue queue = this.queue;
final FlowCycleQueue queue = this.cycleQueue;
final FlowTimerQueue timerQueue = this.timerQueue;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public void setTimerIndex(int index) {
this.timerIndex = index;
}

public Boolean getInCycleQueue() {
return inCycleQueue;
}

public void setInCycleQueue(Boolean inCycleQueue) {
this.inCycleQueue = inCycleQueue;
}

public InstantSource getClock() {
return clock;
}
Expand Down Expand Up @@ -105,6 +113,8 @@ public void setDeadline(long deadline) {
*/
private int timerIndex = -1;

private Boolean inCycleQueue = false;

protected InstantSource clock;
protected FlowGraph parentGraph;
protected FlowEngine engine;
Expand Down Expand Up @@ -139,9 +149,9 @@ public FlowNode(FlowGraph parentGraph) {
public void invalidate(long now) {
// If there is already an update running,
// notify the update, that a next update should be run after
if (this.nodeState == NodeState.UPDATING) {

if (this.nodeState != NodeState.CLOSING && this.nodeState != NodeState.CLOSED) {
this.nodeState = NodeState.INVALIDATED;
} else {
engine.scheduleImmediate(now, this);
}
}
Expand Down Expand Up @@ -172,15 +182,16 @@ public void update(long now) {
doFail(e);
}

// Check whether the stage is marked as closing.
if (this.nodeState == NodeState.INVALIDATED) {
newDeadline = now;
}
if (this.nodeState == NodeState.CLOSING) {
closeNode();
return;
}

// Check whether the stage is marked as closing.
if ((this.nodeState == NodeState.INVALIDATED) || (this.nodeState == NodeState.CLOSED)) {
return;
}

this.deadline = newDeadline;

// Update the timer queue with the new deadline
Expand Down Expand Up @@ -211,14 +222,6 @@ void doFail(Throwable cause) {
*/
public void closeNode() {
if (this.nodeState == NodeState.CLOSED) {
// LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed");
return;
}

// If this stage is running an update, notify it that is should close after.
if (this.nodeState == NodeState.UPDATING) {
// LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active");
this.nodeState = NodeState.CLOSING;
return;
}

Expand Down

0 comments on commit c425a03

Please sign in to comment.