Skip to content

Commit

Permalink
Fixed async launch logic for finite apps
Browse files Browse the repository at this point in the history
  • Loading branch information
olegz committed May 1, 2014
1 parent f379919 commit 251b595
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static void main(String[] args) throws Exception {
build("JavaBasedYarnApplicationDemo");

yarnApplication.launch();
System.out.println();
/*
* This demo demonstrates self-shutdown where application will exit
* upon completion of tasks by all containers.
Expand All @@ -73,6 +74,11 @@ public ByteBuffer process(ByteBuffer inputMessage) {
String strMessage = new String(inputBytes);
strMessage = new StringBuilder(strMessage).reverse().toString();
System.out.println("Processing input: " + strMessage);
try {
Thread.sleep(5000);
} catch (Exception e) {
// TODO: handle exception
}
return null;
// You can also return ByteBuffer, but since its a finite container
// the contents of the returned ByteBuffer will be logged (see JavaBasedYarnApplicationEmulatorDemo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,16 @@ public void doProcess(ByteBuffer reply) {
}
});
}

this.clientServer.stop(false);
/*
* By initiating a graceful shutdown we simply sending a signal
* for an application to stop once complete.
*/
this.executor.execute(new Runnable() {
@Override
public void run() {
clientServer.stop(false);
}
});
returnValue = null;
}
return returnValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import org.junit.Test;

Expand All @@ -54,6 +55,9 @@ public void validateFiniteContainerSelfShutdown() throws Exception {

ApplicationMasterLauncher<Void> amLauncher = new ApplicationMasterLauncherEmulatorImpl<>(applicationSpec);
amLauncher.launch();
while (amLauncher.isRunning()){
LockSupport.parkNanos(1000000);
}
assertFalse(amLauncher.isRunning());
}

Expand Down Expand Up @@ -218,6 +222,9 @@ public void onReply(ByteBuffer replyData) {
}
});
amLauncher.launch();
while (amLauncher.isRunning()){
LockSupport.parkNanos(1000000);
}
assertFalse(amLauncher.isRunning());
assertEquals(expectedContainers, replySucceses.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import org.junit.Test;

Expand All @@ -54,6 +55,9 @@ public void validateFiniteContainerSelfShutdown() throws Exception {

ApplicationMasterLauncher<Void> amLauncher = new ApplicationMasterLauncherEmulatorImpl<>(applicationSpec);
amLauncher.launch();
while (amLauncher.isRunning()){
LockSupport.parkNanos(1000000);
}
assertFalse(amLauncher.isRunning());
}

Expand Down Expand Up @@ -218,6 +222,9 @@ public void onReply(ByteBuffer replyData) {
}
});
amLauncher.launch();
while (amLauncher.isRunning()){
LockSupport.parkNanos(100000);
}
assertFalse(amLauncher.isRunning());
assertEquals(expectedContainers, replySucceses.get());
}
Expand Down

0 comments on commit 251b595

Please sign in to comment.