diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index 75bdde923..b612de2c1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -270,6 +270,10 @@ public void removeConsumerEdge(FlowEdge consumerEdge) { */ @Override public void removeSupplierEdge(FlowEdge supplierEdge) { + if (this.machineEdge == null) { + return; + } + this.stopWorkload(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java index fbbe08151..8487fbc28 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java @@ -139,6 +139,10 @@ public long onUpdate(long now) { @Override public void stopWorkload() { + if (this.machineEdge == null) { + return; + } + this.closeNode(); this.machineEdge = null; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java similarity index 83% rename from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java rename to opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java index bd6220837..72dd217cf 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java @@ -28,28 +28,32 @@ /** * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s - * that have been updated during the engine cycle and should converge. + * that should be updated in the current cycle, because of a change caused by another update in the current cycle. *

* By using a specialized class, we reduce the overhead caused by type-erasure. */ -final class FlowNodeQueue { +final class FlowCycleQueue { /** * The array of elements in the queue. */ - private FlowNode[] elements; + private FlowNode[] nodeQueue; private int head = 0; private int tail = 0; - public FlowNodeQueue(int initialCapacity) { - elements = new FlowNode[initialCapacity]; + public FlowCycleQueue(int initialCapacity) { + nodeQueue = new FlowNode[initialCapacity]; } /** * Add the specified context to the queue. */ void add(FlowNode ctx) { - final FlowNode[] es = elements; + if (ctx.getInCycleQueue()) { + return; + } + + final FlowNode[] es = nodeQueue; int tail = this.tail; es[tail] = ctx; @@ -60,19 +64,22 @@ void add(FlowNode ctx) { if (head == tail) { doubleCapacity(); } + + ctx.setInCycleQueue(true); } /** * Remove a {@link FlowNode} from the queue or null if the queue is empty. */ FlowNode poll() { - final FlowNode[] es = elements; + final FlowNode[] es = nodeQueue; int head = this.head; FlowNode ctx = es[head]; if (ctx != null) { es[head] = null; this.head = inc(head, es.length); + ctx.setInCycleQueue(false); } return ctx; @@ -82,13 +89,13 @@ FlowNode poll() { * Doubles the capacity of this deque */ private void doubleCapacity() { - int oldCapacity = elements.length; + int oldCapacity = nodeQueue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); if (newCapacity < 0) { throw new IllegalStateException("Sorry, deque too big"); } - final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); + final FlowNode[] es = nodeQueue = Arrays.copyOf(nodeQueue, newCapacity); // Exceptionally, here tail == head needs to be disambiguated if (tail < head || (tail == head && es[head] != null)) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 1a068b40d..24476048e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -37,9 +37,9 @@ */ public final class FlowEngine implements Runnable { /** - * The queue of {@link FlowNode} updates that are scheduled for immediate execution. + * The queue of {@link FlowNode} updates that need to be updated in the current cycle. */ - private final FlowNodeQueue queue = new FlowNodeQueue(256); + private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256); /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. @@ -112,7 +112,7 @@ public void scheduleImmediate(long now, FlowNode ctx) { * This method should only be invoked while inside an engine cycle. */ public void scheduleImmediateInContext(FlowNode ctx) { - queue.add(ctx); + cycleQueue.add(ctx); } /** @@ -147,7 +147,7 @@ public void scheduleDelayedInContext(FlowNode ctx) { * Run all the enqueued actions for the specified timestamp (now). */ private void doRunEngine(long now) { - final FlowNodeQueue queue = this.queue; + final FlowCycleQueue queue = this.cycleQueue; final FlowTimerQueue timerQueue = this.timerQueue; try { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index b28271303..64cd0d8c6 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -60,6 +60,14 @@ public void setTimerIndex(int index) { this.timerIndex = index; } + public Boolean getInCycleQueue() { + return inCycleQueue; + } + + public void setInCycleQueue(Boolean inCycleQueue) { + this.inCycleQueue = inCycleQueue; + } + public InstantSource getClock() { return clock; } @@ -105,6 +113,8 @@ public void setDeadline(long deadline) { */ private int timerIndex = -1; + private Boolean inCycleQueue = false; + protected InstantSource clock; protected FlowGraph parentGraph; protected FlowEngine engine; @@ -139,9 +149,9 @@ public FlowNode(FlowGraph parentGraph) { public void invalidate(long now) { // If there is already an update running, // notify the update, that a next update should be run after - if (this.nodeState == NodeState.UPDATING) { + + if (this.nodeState != NodeState.CLOSING && this.nodeState != NodeState.CLOSED) { this.nodeState = NodeState.INVALIDATED; - } else { engine.scheduleImmediate(now, this); } } @@ -172,15 +182,16 @@ public void update(long now) { doFail(e); } - // Check whether the stage is marked as closing. - if (this.nodeState == NodeState.INVALIDATED) { - newDeadline = now; - } if (this.nodeState == NodeState.CLOSING) { closeNode(); return; } + // Check whether the stage is marked as closing. + if ((this.nodeState == NodeState.INVALIDATED) || (this.nodeState == NodeState.CLOSED)) { + return; + } + this.deadline = newDeadline; // Update the timer queue with the new deadline @@ -211,14 +222,6 @@ void doFail(Throwable cause) { */ public void closeNode() { if (this.nodeState == NodeState.CLOSED) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); - return; - } - - // If this stage is running an update, notify it that is should close after. - if (this.nodeState == NodeState.UPDATING) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); - this.nodeState = NodeState.CLOSING; return; }