From 251b595016e4cd8da9d0b4125e4048dd185b41ee Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 1 May 2014 07:50:19 -0400 Subject: [PATCH] Fixed async launch logic for finite apps --- .../demo/JavaBasedYarnApplicationClusterDemo.java | 6 ++++++ .../api/core/AbstractApplicationMasterLauncher.java | 12 ++++++++++-- .../hadoop/yarn/api/core/ApplicationMasterTests.java | 7 +++++++ .../yarn/api/core/RealApplicationMasterTests.java | 7 +++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/yaya-demo/src/main/java/yarn/demo/JavaBasedYarnApplicationClusterDemo.java b/yaya-demo/src/main/java/yarn/demo/JavaBasedYarnApplicationClusterDemo.java index 54bb8a0..f8d5fe2 100644 --- a/yaya-demo/src/main/java/yarn/demo/JavaBasedYarnApplicationClusterDemo.java +++ b/yaya-demo/src/main/java/yarn/demo/JavaBasedYarnApplicationClusterDemo.java @@ -54,6 +54,7 @@ public static void main(String[] args) throws Exception { build("JavaBasedYarnApplicationDemo"); yarnApplication.launch(); + System.out.println(); /* * This demo demonstrates self-shutdown where application will exit * upon completion of tasks by all containers. @@ -73,6 +74,11 @@ public ByteBuffer process(ByteBuffer inputMessage) { String strMessage = new String(inputBytes); strMessage = new StringBuilder(strMessage).reverse().toString(); System.out.println("Processing input: " + strMessage); + try { + Thread.sleep(5000); + } catch (Exception e) { + // TODO: handle exception + } return null; // You can also return ByteBuffer, but since its a finite container // the contents of the returned ByteBuffer will be logged (see JavaBasedYarnApplicationEmulatorDemo) diff --git a/yaya/src/main/java/oz/hadoop/yarn/api/core/AbstractApplicationMasterLauncher.java b/yaya/src/main/java/oz/hadoop/yarn/api/core/AbstractApplicationMasterLauncher.java index 1d6f895..a620099 100644 --- a/yaya/src/main/java/oz/hadoop/yarn/api/core/AbstractApplicationMasterLauncher.java +++ b/yaya/src/main/java/oz/hadoop/yarn/api/core/AbstractApplicationMasterLauncher.java @@ -203,8 +203,16 @@ public void doProcess(ByteBuffer reply) { } }); } - - this.clientServer.stop(false); + /* + * By initiating a graceful shutdown we simply sending a signal + * for an application to stop once complete. + */ + this.executor.execute(new Runnable() { + @Override + public void run() { + clientServer.stop(false); + } + }); returnValue = null; } return returnValue; diff --git a/yaya/src/test/java/oz/hadoop/yarn/api/core/ApplicationMasterTests.java b/yaya/src/test/java/oz/hadoop/yarn/api/core/ApplicationMasterTests.java index f00b712..fb2737f 100644 --- a/yaya/src/test/java/oz/hadoop/yarn/api/core/ApplicationMasterTests.java +++ b/yaya/src/test/java/oz/hadoop/yarn/api/core/ApplicationMasterTests.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.junit.Test; @@ -54,6 +55,9 @@ public void validateFiniteContainerSelfShutdown() throws Exception { ApplicationMasterLauncher amLauncher = new ApplicationMasterLauncherEmulatorImpl<>(applicationSpec); amLauncher.launch(); + while (amLauncher.isRunning()){ + LockSupport.parkNanos(1000000); + } assertFalse(amLauncher.isRunning()); } @@ -218,6 +222,9 @@ public void onReply(ByteBuffer replyData) { } }); amLauncher.launch(); + while (amLauncher.isRunning()){ + LockSupport.parkNanos(1000000); + } assertFalse(amLauncher.isRunning()); assertEquals(expectedContainers, replySucceses.get()); } diff --git a/yaya/src/test/java/oz/hadoop/yarn/api/core/RealApplicationMasterTests.java b/yaya/src/test/java/oz/hadoop/yarn/api/core/RealApplicationMasterTests.java index be6a125..5b3622a 100644 --- a/yaya/src/test/java/oz/hadoop/yarn/api/core/RealApplicationMasterTests.java +++ b/yaya/src/test/java/oz/hadoop/yarn/api/core/RealApplicationMasterTests.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.junit.Test; @@ -54,6 +55,9 @@ public void validateFiniteContainerSelfShutdown() throws Exception { ApplicationMasterLauncher amLauncher = new ApplicationMasterLauncherEmulatorImpl<>(applicationSpec); amLauncher.launch(); + while (amLauncher.isRunning()){ + LockSupport.parkNanos(1000000); + } assertFalse(amLauncher.isRunning()); } @@ -218,6 +222,9 @@ public void onReply(ByteBuffer replyData) { } }); amLauncher.launch(); + while (amLauncher.isRunning()){ + LockSupport.parkNanos(100000); + } assertFalse(amLauncher.isRunning()); assertEquals(expectedContainers, replySucceses.get()); }