Skip to content

Commit

Permalink
conflict resolved, should work with new VMF+ReadOnly and new executor…
Browse files Browse the repository at this point in the history
… class names
  • Loading branch information
miho committed Nov 9, 2021
2 parents a1c4908 + cd81881 commit 0686e8e
Showing 5 changed files with 1,402 additions and 1,256 deletions.
Original file line number Diff line number Diff line change
@@ -33,12 +33,16 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* An async executor for state machines.
*/
class FSMExecutor implements AsyncFSMExecutor {


private final Deque<Event> evtQueue = new ConcurrentLinkedDeque<>();
private Thread doActionThread;
private CompletableFuture<Void> doActionFuture;
private Thread executionThread;
private volatile Thread doActionThread;
private volatile CompletableFuture<Void> doActionFuture;
private volatile Thread executionThread;
private final Map<State, Boolean> stateExited = new HashMap<>();
private final int depth;
private final FSM fsm;
@@ -49,10 +53,10 @@ class FSMExecutor implements AsyncFSMExecutor {

private final AsyncFSMExecutor.ExecutionMode mode;

private static final long MAX_EVT_CONSUMED_ACTION_TIMEOUT = 1000 /*ms*/;
private static final long MAX_ENTER_ACTION_TIMEOUT = 1000 /*ms*/;
private static final long MAX_EXIT_ACTION_TIMEOUT = 1000 /*ms*/;
private static final long MAX_TRANSITION_ACTION_TIMEOUT = 1000 /*ms*/;
private static final long MAX_EVT_CONSUMED_ACTION_TIMEOUT = 10_000 /*ms*/;
private static final long MAX_ENTER_ACTION_TIMEOUT = 10_000 /*ms*/;
private static final long MAX_EXIT_ACTION_TIMEOUT = 10_000 /*ms*/;
private static final long MAX_TRANSITION_ACTION_TIMEOUT = 10_000 /*ms*/;

private static Optional<FSMExecutor> getLCA(FSMExecutor a, FSMExecutor b) {
int start = Math.min(a.pathToRoot.size(), b.pathToRoot.size());
@@ -77,6 +81,12 @@ private FSMExecutor(FSM fsm, ExecutionMode mode, int depth, FSMExecutor parent)
pathToRoot.add(this);
}

/**
* Creates a new async executor instance.
* @param fsm the fsm to execute
* @param mode the execution mode
* @return the new executor instance
*/
public static FSMExecutor newInstance(FSM fsm, ExecutionMode mode) {
return new FSMExecutor(fsm, mode, 0, null);
}
@@ -128,7 +138,34 @@ public void triggerFirst(Event event) {
}
}

@Override
public boolean process(Event evt) {

if(executorRunning.get()) {
throw new RuntimeException(
"Cannot call 'process()' if machine is already running,"+
" try calling trigger(). The 'process()' method triggers and" +
" processes the event in a single method call.");
}

try {
trigger(evt);
return processRemainingEvents();
} finally {
//
}
}

@Override
public boolean process(String evt, EventConsumedAction onConsumed, Object... args) {

if(executorRunning.get()) {
throw new RuntimeException(
"Cannot call 'process()' if machine is already running,"+
" try calling trigger(). The 'process()' method triggers and" +
" processes the event in a single method call.");
}

try {
trigger(evt, onConsumed, args);
return processRemainingEvents();
@@ -140,12 +177,12 @@ public boolean process(String evt, EventConsumedAction onConsumed, Object... arg
@Override
public boolean process(String evt, Object... args) {

// if() {
// throw new RuntimeException(
// "Cannot call 'process()' if machine is already running,"+
// " try calling trigger(). The 'process()' method triggers and" +
// " processes the event in a single method call.");
// }
if(executorRunning.get()) {
throw new RuntimeException(
"Cannot call 'process()' if machine is already running,"+
" try calling trigger(). The 'process()' method triggers and" +
" processes the event in a single method call.");
}

try {
trigger(evt, args);
@@ -184,10 +221,11 @@ public void accessFSMSafe(Consumer<FSM> fsmTask) {
}
}

boolean firedFinalState = false;
boolean firedDoActionDone = false;
boolean firedStateDone = false;
private volatile boolean firedFinalState = false;
private volatile boolean firedDoActionDone = false;
private volatile boolean firedStateDone = false;

@Override
public boolean processRemainingEvents() {

// everything modified concurrently with start(), reset(), stop() etc. must be inside
@@ -218,31 +256,31 @@ public boolean processRemainingEvents() {
boolean consumed = false;
State prevState = getCaller().getCurrentState();

if(prevState instanceof FSMState) {
// if we are in a state with nested fsm we process any upcoming events even if we don't
// currently have events in our queue
if (prevState instanceof FSMState) {
FSMState fsmState = (FSMState) prevState;
for (FSM childFSM : fsmState.getFSMs()) {
if (childFSM != null) {
childFSM.getExecutor().processRemainingEvents();
}
} // end for each child fsm

boolean allMatch = fsmState.getFSMs().stream()
.allMatch(fsm->!fsm.isRunning()&&fsm.getFinalState().contains(fsm.getCurrentState()));

if(allMatch && !firedFinalState) {
log("> triggering final-state, currently in state " + prevState.getName());
triggerFirst(Event.newBuilder().withName(FSMEvents.FINAL_STATE.getName()).withLocal(true)
.withArgs(fsmState.getName()+":"+System.identityHashCode(fsmState)).build());
firedFinalState = true;
log(" -> final state reached via: "
+ fsmState.getFSMs().stream().map(cfsm->cfsm.getName()).collect(Collectors.toList()));
// if we are in a state with nested fsm we process any upcoming events even if we don't
// currently have events in our queue
if (prevState instanceof FSMState) {
FSMState fsmState = (FSMState) prevState;
for (FSM childFSM : fsmState.getFSMs()) {
if (childFSM != null) {
childFSM.getExecutor().processRemainingEvents();
}
} // end for each child fsm

boolean allMatch = fsmState.getFSMs().stream()
.allMatch(fsm -> !fsm.isRunning() && fsm.getFinalState().contains(fsm.getCurrentState()));

if (allMatch && !firedFinalState) {
log("> triggering final-state, currently in state " + prevState.getName());
triggerFirst(Event.newBuilder().withName(FSMEvents.FINAL_STATE.getName()).withLocal(true)
.withArgs(fsmState.getName() + ":" + System.identityHashCode(fsmState)).build());
firedFinalState = true;
log(" -> final state reached via: "
+ fsmState.getFSMs().stream().map(cfsm -> cfsm.getName()).collect(Collectors.toList()));
}
}


for (Iterator<Event> iter = evtQueue.iterator(); iter.hasNext() && getCaller().isRunning(); ) {

fsmLock.lock();
@@ -298,14 +336,7 @@ public boolean processRemainingEvents() {
removed = removedParam.get();
consumed = consumedParam.get();
} else {
// if(!firedFinalState) {
// triggerFirst(Event.newBuilder()
// .withName(FSMEvents.FINAL_STATE.getName())
// .withLocal(true).build());
// log("> triggering final-state, currently in state " + currentState.getName());
// log(" -> final state reached via: current state (no children available)");
// firedFinalState = true;
// }
//
}

if(FSMEvents.FINAL_STATE.getName().equals(evt.getName())) {
@@ -787,7 +818,7 @@ private boolean executeDoActionOfNewState(Event evt, State oldState, State newSt
return true;
}

//@Override
@Override
public void exitDoActionOfState(Event evt, State state) {
this.exitDoActionOfOldState(evt, state, null);
}
@@ -900,6 +931,7 @@ private boolean defers(State s, Event evt) {
|| s.getDeferredEvents().stream().anyMatch(dE-> Pattern.matches(dE, evt.getName()));
}

@Override
public void startAndWait() {

var f = new CompletableFuture();
@@ -923,46 +955,59 @@ public void startAndWait() {
private long duration2 = 100;
private long duration3 = 10;
private long waitTime = 10;

private final AtomicBoolean executorRunning = new AtomicBoolean();

private void start_int() {
while(getCaller().isRunning()&&!Thread.currentThread().isInterrupted()) {
try {
long currentTime = System.currentTimeMillis();
boolean eventsProcessed = getCaller().getExecutor().processRemainingEvents();
try {
executorRunning.set(true);
while (getCaller().isRunning() && !Thread.currentThread().isInterrupted()) {
try {
long currentTime = System.currentTimeMillis();
boolean eventsProcessed = getCaller().getExecutor().processRemainingEvents();

if(Thread.currentThread() == executionThread) {
if(eventsProcessed||timestamp==0) {
timestamp = currentTime;
}
if (Thread.currentThread() == executionThread) {
if (eventsProcessed || timestamp == 0) {
timestamp = currentTime;
}

long timeDiff = currentTime - timestamp;
if (timeDiff > duration1) {
waitTime = 100;
} else if (timeDiff > duration2) {
waitTime = 10;
} else if (timeDiff > duration3) {
waitTime = 1;
} else {
// full speed
waitTime = 0;
}
long timeDiff = currentTime - timestamp;
if (timeDiff > duration1) {
waitTime = 100;
} else if (timeDiff > duration2) {
waitTime = 10;
} else if (timeDiff > duration3) {
waitTime = 1;
} else {
// full speed
waitTime = 0;
}

try {
synchronized (executionThread) {
if(waitTime>0) {
executionThread.wait(waitTime);
try {
synchronized (executionThread) {
if (waitTime > 0) {
executionThread.wait(waitTime);
}
}
} catch (InterruptedException iEx) {
Thread.currentThread().interrupt();
}
} catch (InterruptedException iEx) {
Thread.currentThread().interrupt();
}
} catch (Exception ex) {
Thread.currentThread().interrupt();
throw ex;
}
} catch (Exception ex) {
Thread.currentThread().interrupt();
throw ex;
}
} finally {
executorRunning.set(false);
}
}

@Override
public boolean isRunning() {
return executorRunning.get();
}

@Override
public CompletableFuture<Void> startAsync() {

Loading

0 comments on commit 0686e8e

Please sign in to comment.