From b0e27bf39088261788c8abfacb5609c859977c90 Mon Sep 17 00:00:00 2001 From: billy1380 Date: Fri, 30 Jan 2015 19:20:24 +0000 Subject: [PATCH 1/9] added method to get PromiseValue using handle --- .../google/appengine/tools/pipeline/Job.java | 10 +++ .../tools/pipeline/impl/PipelineManager.java | 61 +++++++++++++++++++ .../pipeline/impl/PromisedValueImpl.java | 4 ++ 3 files changed, 75 insertions(+) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java index 3e6ee155..ea7fe7e4 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java @@ -20,6 +20,7 @@ import com.google.appengine.tools.pipeline.impl.PromisedValueImpl; import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec; import com.google.appengine.tools.pipeline.impl.model.JobRecord; +import com.google.appengine.tools.pipeline.impl.model.Slot; import java.io.Serializable; import java.util.List; @@ -385,6 +386,15 @@ public PromisedValue newPromise() { updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot()); return promisedValue; } + + public PromisedValue promise(String promiseHandle) { + Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle); + PromisedValueImpl promisedValue = null; + if (slot != null) { + promisedValue = new PromisedValueImpl(slot); + } + return promisedValue; + } /** * Invoke this method from within the {@code run} method of a generator diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java index 4e80f575..830cf28a 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java @@ -473,6 +473,67 @@ public static void acceptPromisedValue(String promiseHandle, Object value) backEnd.save(updateSpec, generatorJob.getQueueSettings()); } + public static Slot getPromisedValueSlot(String promiseHandle) { + Slot slot = null; + try { + checkNonEmpty(promiseHandle, "promiseHandle"); + Key key = KeyFactory.stringToKey(promiseHandle); + // It is possible, though unlikely, that we might be asked to accept a + // promise before the slot to hold the promise has been saved. We will try 5 + // times, sleeping 1, 2, 4, 8 seconds between attempts. + int attempts = 0; + boolean interrupted = false; + try { + while (slot == null) { + attempts++; + try { + slot = backEnd.querySlot(key, false); + } catch (NoSuchObjectException e) { + if (attempts >= 5) { + throw new NoSuchObjectException("There is no promise with handle " + promiseHandle); + } + try { + Thread.sleep((long) Math.pow(2.0, attempts - 1) * 1000L); + } catch (InterruptedException f) { + interrupted = true; + } + } + } + } finally { + // TODO(user): replace with Uninterruptibles#sleepUninterruptibly once we use guava + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + Key generatorJobKey = slot.getGeneratorJobKey(); + if (null == generatorJobKey) { + throw new RuntimeException( + "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot); + } + JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE); + if (null == generatorJob) { + throw new RuntimeException("Pipeline is fatally corrupted. " + + "The generator job for a promised value slot was not found: " + generatorJobKey); + } + String childGraphGuid = generatorJob.getChildGraphGuid(); + if (null == childGraphGuid) { + // The generator job has not been saved with a childGraphGuid yet. This can happen if the + // promise handle leaked out to an external thread before the job that generated it + // had finished. + throw new NoSuchObjectException( + "The framework is not ready to accept the promised value yet. " + + "Please try again after the job that generated the promis handle has completed."); + } + if (!childGraphGuid.equals(slot.getGraphGuid())) { + // The slot has been orphaned + throw new OrphanedObjectException(promiseHandle); + } + } catch (Exception e) { + logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e); + } + return slot; + } + /** * The root job instance used to wrap a user provided root if the user * provided root job has exceptionHandler specified. diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java index 06977f21..b0a13566 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java @@ -29,6 +29,10 @@ */ public class PromisedValueImpl extends FutureValueImpl implements PromisedValue { + public PromisedValueImpl(Slot slot) { + super(slot); + } + public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) { super(new Slot(rootJobGuid, generatorJobKey, graphGUID)); } From 9b43be5f785c154dc31d40e70044bec9dc6da50b Mon Sep 17 00:00:00 2001 From: billy1380 Date: Fri, 13 Feb 2015 12:30:47 +0000 Subject: [PATCH 2/9] fixed constructor to call this instead of super --- .../google/appengine/tools/pipeline/impl/PromisedValueImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java index b0a13566..e465d7bb 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java @@ -34,7 +34,7 @@ public PromisedValueImpl(Slot slot) { } public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) { - super(new Slot(rootJobGuid, generatorJobKey, graphGUID)); + this(new Slot(rootJobGuid, generatorJobKey, graphGUID)); } @Override From 202d8f278416f8d2ead6f5745121ca9ab1247393 Mon Sep 17 00:00:00 2001 From: billy1380 Date: Fri, 13 Feb 2015 12:43:17 +0000 Subject: [PATCH 3/9] added method queryPromisedValueSlot that is used from getPromisedValueSlot and acceptPromisedValue --- .../tools/pipeline/impl/PipelineManager.java | 144 ++++++++---------- 1 file changed, 61 insertions(+), 83 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java index 830cf28a..758aed4a 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java @@ -24,6 +24,7 @@ import com.google.appengine.tools.pipeline.JobSetting; import com.google.appengine.tools.pipeline.NoSuchObjectException; import com.google.appengine.tools.pipeline.OrphanedObjectException; +import com.google.appengine.tools.pipeline.PromisedValue; import com.google.appengine.tools.pipeline.Value; import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd; import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd; @@ -413,38 +414,24 @@ public static void deletePipelineRecords(String pipelineHandle, boolean force, b backEnd.deletePipeline(key, force, async); } + /** + * Fills a the {@link PromisedValue}. + * @param promiseHandle the key to the promised value slot. + * @param value the value to fill. + * @throws NoSuchObjectException If there is no Job with the given key. + * @throws OrphanedObjectException If the slot has been orphaned. + */ public static void acceptPromisedValue(String promiseHandle, Object value) throws NoSuchObjectException, OrphanedObjectException { - checkNonEmpty(promiseHandle, "promiseHandle"); - Key key = KeyFactory.stringToKey(promiseHandle); - Slot slot = null; - // It is possible, though unlikely, that we might be asked to accept a - // promise before the slot to hold the promise has been saved. We will try 5 - // times, sleeping 1, 2, 4, 8 seconds between attempts. - int attempts = 0; - boolean interrupted = false; - try { - while (slot == null) { - attempts++; - try { - slot = backEnd.querySlot(key, false); - } catch (NoSuchObjectException e) { - if (attempts >= 5) { - throw new NoSuchObjectException("There is no promise with handle " + promiseHandle); - } - try { - Thread.sleep((long) Math.pow(2.0, attempts - 1) * 1000L); - } catch (InterruptedException f) { - interrupted = true; - } - } - } - } finally { - // TODO(user): replace with Uninterruptibles#sleepUninterruptibly once we use guava - if (interrupted) { - Thread.currentThread().interrupt(); - } - } + Slot slot = queryPromisedValueSlot(promiseHandle); + JobRecord generatorJob = getGeneratorJob(slot); + UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey()); + registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value); + backEnd.save(updateSpec, generatorJob.getQueueSettings()); + } + + private static JobRecord getGeneratorJob(Slot slot) + throws OrphanedObjectException, NoSuchObjectException { Key generatorJobKey = slot.getGeneratorJobKey(); if (null == generatorJobKey) { throw new RuntimeException( @@ -466,70 +453,61 @@ public static void acceptPromisedValue(String promiseHandle, Object value) } if (!childGraphGuid.equals(slot.getGraphGuid())) { // The slot has been orphaned - throw new OrphanedObjectException(promiseHandle); + throw new OrphanedObjectException(KeyFactory.keyToString(slot.getKey())); } - UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey()); - registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value); - backEnd.save(updateSpec, generatorJob.getQueueSettings()); + + return generatorJob; } + /** + * Get the {@link Slot} for a promise handle that can be used to construct a + * {@link PromisedValue} for use in child {@link Job}s. It will attempt to lookup + * the key 5 times with an exponential back-off just in case it has been requested + * before the promise has been registered. + * @param promiseHandle Key to find the {@link Slot} + * @return A {@link Slot} if the handle can be resolved otherwise null. + */ public static Slot getPromisedValueSlot(String promiseHandle) { Slot slot = null; try { - checkNonEmpty(promiseHandle, "promiseHandle"); - Key key = KeyFactory.stringToKey(promiseHandle); - // It is possible, though unlikely, that we might be asked to accept a - // promise before the slot to hold the promise has been saved. We will try 5 - // times, sleeping 1, 2, 4, 8 seconds between attempts. - int attempts = 0; - boolean interrupted = false; - try { - while (slot == null) { - attempts++; + slot = queryPromisedValueSlot(promiseHandle); + } catch (Exception e) { + logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e); + } + return slot; + } + + private static Slot queryPromisedValueSlot(String promiseHandle) + throws NoSuchObjectException { + Slot slot = null; + checkNonEmpty(promiseHandle, "promiseHandle"); + Key key = KeyFactory.stringToKey(promiseHandle); + // It is possible, though unlikely, that we might be asked to accept a + // promise before the slot to hold the promise has been saved. We will try 5 + // times, sleeping 1, 2, 4, 8 seconds between attempts. + int attempts = 0; + boolean interrupted = false; + try { + while (slot == null) { + attempts++; + try { + slot = backEnd.querySlot(key, false); + } catch (NoSuchObjectException e) { + if (attempts >= 5) { + throw new NoSuchObjectException("There is no promise with handle " + promiseHandle); + } try { - slot = backEnd.querySlot(key, false); - } catch (NoSuchObjectException e) { - if (attempts >= 5) { - throw new NoSuchObjectException("There is no promise with handle " + promiseHandle); - } - try { - Thread.sleep((long) Math.pow(2.0, attempts - 1) * 1000L); - } catch (InterruptedException f) { - interrupted = true; - } + Thread.sleep((long) Math.pow(2.0, attempts - 1) * 1000L); + } catch (InterruptedException f) { + interrupted = true; } } - } finally { - // TODO(user): replace with Uninterruptibles#sleepUninterruptibly once we use guava - if (interrupted) { - Thread.currentThread().interrupt(); - } } - Key generatorJobKey = slot.getGeneratorJobKey(); - if (null == generatorJobKey) { - throw new RuntimeException( - "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot); - } - JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE); - if (null == generatorJob) { - throw new RuntimeException("Pipeline is fatally corrupted. " - + "The generator job for a promised value slot was not found: " + generatorJobKey); - } - String childGraphGuid = generatorJob.getChildGraphGuid(); - if (null == childGraphGuid) { - // The generator job has not been saved with a childGraphGuid yet. This can happen if the - // promise handle leaked out to an external thread before the job that generated it - // had finished. - throw new NoSuchObjectException( - "The framework is not ready to accept the promised value yet. " - + "Please try again after the job that generated the promis handle has completed."); - } - if (!childGraphGuid.equals(slot.getGraphGuid())) { - // The slot has been orphaned - throw new OrphanedObjectException(promiseHandle); + } finally { + // TODO(user): replace with Uninterruptibles#sleepUninterruptibly once we use guava + if (interrupted) { + Thread.currentThread().interrupt(); } - } catch (Exception e) { - logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e); } return slot; } From c9ad75f6faeb3dc80585e00aa15fe054481424d8 Mon Sep 17 00:00:00 2001 From: billy1380 Date: Fri, 13 Feb 2015 12:44:02 +0000 Subject: [PATCH 4/9] added javadoc to newPromise and promise methods --- .../google/appengine/tools/pipeline/Job.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java index ea7fe7e4..5e6e9c3f 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java @@ -380,6 +380,18 @@ public PromisedValue newPromise(Class klass) { return promisedValue; } + /** + * Invoke this method from within the {@code run} method of a generator + * job in order to declare that some value will be provided asynchronously + * by some external agent. + * + * @param The type of the asynchronously provided value. + * @return A {@code PromisedValue} that represents an empty value slot that + * will be filled at a later time when the external agent invokes + * {@link PipelineService#submitPromisedValue(String, Object)}. This + * may be passed in to further invocations of {@code futureCall()} in + * order to specify a data dependency. + */ public PromisedValue newPromise() { PromisedValueImpl promisedValue = new PromisedValueImpl<>(getPipelineKey(), thisJobRecord.getKey(), currentRunGUID); @@ -387,6 +399,25 @@ public PromisedValue newPromise() { return promisedValue; } + /** + * Invoke this method from within the {@code run} method of a generator + * job in order to get some value that has been previously declared and + * that will be provided asynchronously by some external agent. This can be + * used to share the same value with child {@link Job}s + * + * + * @param promiseHandle The unique identifier for the {@link PromisedValue} + * obtained during the execution of some job via the method + * {@link PromisedValue#getHandle()}. + * @return A {@code PromisedValue} that represents an empty value slot that + * will be filled at a later time when the external agent invokes + * {@link PipelineService#submitPromisedValue(String, Object)}. This + * may be passed in to further invocations of {@code futureCall()} in + * order to specify a data dependency. + * + * Note: This method will return null if the slot for the + * handle could not be found. + */ public PromisedValue promise(String promiseHandle) { Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle); PromisedValueImpl promisedValue = null; From 99db1fcd9c6db80bc45905613a11edcbaedc6eed Mon Sep 17 00:00:00 2001 From: billy1380 Date: Sat, 9 May 2015 12:38:43 +0100 Subject: [PATCH 5/9] removed double line spacing in javadoc --- java/src/main/java/com/google/appengine/tools/pipeline/Job.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java index 5e6e9c3f..8933d573 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java @@ -405,7 +405,6 @@ public PromisedValue newPromise() { * that will be provided asynchronously by some external agent. This can be * used to share the same value with child {@link Job}s * - * * @param promiseHandle The unique identifier for the {@link PromisedValue} * obtained during the execution of some job via the method * {@link PromisedValue#getHandle()}. From eba564226f6c279f6eff3c5de9dd23c9d84f16b6 Mon Sep 17 00:00:00 2001 From: billy1380 Date: Sat, 9 May 2015 12:42:57 +0100 Subject: [PATCH 6/9] removed note about method returning null and added to the rest of the return docs --- .../main/java/com/google/appengine/tools/pipeline/Job.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java index 8933d573..b29e7977 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java @@ -412,10 +412,8 @@ public PromisedValue newPromise() { * will be filled at a later time when the external agent invokes * {@link PipelineService#submitPromisedValue(String, Object)}. This * may be passed in to further invocations of {@code futureCall()} in - * order to specify a data dependency. - * - * Note: This method will return null if the slot for the - * handle could not be found. + * order to specify a data dependency. This method will return + * null if the slot for the handle could not be found. */ public PromisedValue promise(String promiseHandle) { Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle); From 9e418c2411a2de0189e55a7b070deaa14c4669f7 Mon Sep 17 00:00:00 2001 From: billy1380 Date: Sat, 9 May 2015 15:12:59 +0100 Subject: [PATCH 7/9] added 3 tests for unfilled promise, filled promise and non-existent promise --- .../tools/pipeline/MiscPipelineTest.java | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java index 2e6a19d8..a3cd08f4 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java @@ -644,4 +644,95 @@ public Value run(long[] bytes) { return immediate(bytes.length); } } + + public void testUnfilledPromiseFromHandle() throws Exception { + PipelineService service = PipelineServiceFactory.newPipelineService(); + String pipelineId = service + .startNewPipeline(new UnfilledPromiseFromHandleParentJob()); + String value = waitForJobToComplete(pipelineId); + assertEquals("1323", value); + } + + @SuppressWarnings("serial") + private static class UnfilledPromiseFromHandleParentJob extends Job0 { + + @Override + public Value run() throws Exception { + PromisedValue promise = newPromise(); + FutureValue child1 = futureCall(new PromiseDelegateJob(), + immediate("1"), immediate(promise.getHandle())); + FutureValue child2 = futureCall(new PromiseDelegateJob(), + immediate("2"), immediate(promise.getHandle())); + futureCall(new FillPromiseJob(), immediate("3"), + immediate(promise.getHandle())); + FutureValue child4 = futureCall(new StringAdderJob(), child1, + child2); + return child4; + } + } + + @SuppressWarnings("serial") + private static class PromiseDelegateJob extends Job2 { + @Override + public Value run(String value, String handle) { + PromisedValue promise = promise(handle); + FutureValue added = futureCall(new StringAdderJob(), + immediate(value), promise); + return added; + } + } + + @SuppressWarnings("serial") + private static class StringAdderJob extends Job2 { + @Override + public Value run(String value1, String value2) { + return immediate(value1 + value2); + } + } + + public void testFilledPromiseFromHandle() throws Exception { + PipelineService service = PipelineServiceFactory.newPipelineService(); + String pipelineId = service + .startNewPipeline(new FilledPromiseFromHandleParentJob()); + String value = waitForJobToComplete(pipelineId); + assertEquals("1323", value); + } + + @SuppressWarnings("serial") + private static class FilledPromiseFromHandleParentJob extends Job0 { + + @Override + public Value run() throws Exception { + PromisedValue promise = newPromise(); + waitFor(futureCall(new FillPromiseJob(), immediate("3"), + immediate(promise.getHandle()))); + + FutureValue child1 = futureCall(new PromiseDelegateJob(), + immediate("1"), immediate(promise.getHandle())); + FutureValue child2 = futureCall(new PromiseDelegateJob(), + immediate("2"), immediate(promise.getHandle())); + + return futureCall(new StringAdderJob(), child1, child2); + } + } + + public void testPromiseFromNonExistentHandle() throws Exception { + PipelineService service = PipelineServiceFactory.newPipelineService(); + String pipelineId = service + .startNewPipeline(new PromiseFromNonExistentHandleParentJob()); + String value = waitForJobToComplete(pipelineId); + assertNull(value); + } + + @SuppressWarnings("serial") + private static class PromiseFromNonExistentHandleParentJob extends + Job0 { + + @Override + public Value run() throws Exception { + PromisedValue promise = promise("SOME_NON_HANDLE"); + return immediate(promise == null ? null : "This is odd!"); + } + } + } From 32321eb4abbd8e57fbd7697c1dd8ba5c93a20b81 Mon Sep 17 00:00:00 2001 From: billy1380 Date: Mon, 11 May 2015 21:25:49 +0100 Subject: [PATCH 8/9] - used java 7 syntax for declaring generic - updated javadoc language to highlight ansestor constraint for promises --- .../java/com/google/appengine/tools/pipeline/Job.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java index b29e7977..77f074a8 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java @@ -401,9 +401,9 @@ public PromisedValue newPromise() { /** * Invoke this method from within the {@code run} method of a generator - * job in order to get some value that has been previously declared and - * that will be provided asynchronously by some external agent. This can be - * used to share the same value with child {@link Job}s + * job in order to get a promised value that was created by an ancestor + * job that will be provided asynchronously by some external agent. This can + * be used to share the same value with child {@link Job}s * * @param promiseHandle The unique identifier for the {@link PromisedValue} * obtained during the execution of some job via the method @@ -419,7 +419,7 @@ public PromisedValue promise(String promiseHandle) { Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle); PromisedValueImpl promisedValue = null; if (slot != null) { - promisedValue = new PromisedValueImpl(slot); + promisedValue = new PromisedValueImpl<>(slot); } return promisedValue; } From 79c02c3f075753fd507e95ea0d57fd7c24a445ce Mon Sep 17 00:00:00 2001 From: billy1380 Date: Tue, 12 May 2015 09:35:17 +0100 Subject: [PATCH 9/9] - reformatted code for 100 char line - added wait setting to subsequent jobs in testFilledPromiseFromHandle - return boolean from PromiseFromNonExistentHandleParentJob and asserting true --- .../tools/pipeline/MiscPipelineTest.java | 46 ++++++++----------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java index a3cd08f4..4ce9ec38 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java @@ -16,6 +16,7 @@ import com.google.appengine.tools.pipeline.JobInfo.State; import com.google.appengine.tools.pipeline.JobSetting.StatusConsoleUrl; +import com.google.appengine.tools.pipeline.JobSetting.WaitForSetting; import com.google.appengine.tools.pipeline.impl.PipelineManager; import com.google.appengine.tools.pipeline.impl.model.JobRecord; import com.google.appengine.tools.pipeline.impl.model.PipelineObjects; @@ -647,8 +648,7 @@ public Value run(long[] bytes) { public void testUnfilledPromiseFromHandle() throws Exception { PipelineService service = PipelineServiceFactory.newPipelineService(); - String pipelineId = service - .startNewPipeline(new UnfilledPromiseFromHandleParentJob()); + String pipelineId = service.startNewPipeline(new UnfilledPromiseFromHandleParentJob()); String value = waitForJobToComplete(pipelineId); assertEquals("1323", value); } @@ -659,14 +659,12 @@ private static class UnfilledPromiseFromHandleParentJob extends Job0 { @Override public Value run() throws Exception { PromisedValue promise = newPromise(); - FutureValue child1 = futureCall(new PromiseDelegateJob(), - immediate("1"), immediate(promise.getHandle())); - FutureValue child2 = futureCall(new PromiseDelegateJob(), - immediate("2"), immediate(promise.getHandle())); - futureCall(new FillPromiseJob(), immediate("3"), + FutureValue child1 = futureCall(new PromiseDelegateJob(), immediate("1"), immediate(promise.getHandle())); - FutureValue child4 = futureCall(new StringAdderJob(), child1, - child2); + FutureValue child2 = futureCall(new PromiseDelegateJob(), immediate("2"), + immediate(promise.getHandle())); + futureCall(new FillPromiseJob(), immediate("3"), immediate(promise.getHandle())); + FutureValue child4 = futureCall(new StringAdderJob(), child1, child2); return child4; } } @@ -676,8 +674,7 @@ private static class PromiseDelegateJob extends Job2 { @Override public Value run(String value, String handle) { PromisedValue promise = promise(handle); - FutureValue added = futureCall(new StringAdderJob(), - immediate(value), promise); + FutureValue added = futureCall(new StringAdderJob(), immediate(value), promise); return added; } } @@ -692,8 +689,7 @@ public Value run(String value1, String value2) { public void testFilledPromiseFromHandle() throws Exception { PipelineService service = PipelineServiceFactory.newPipelineService(); - String pipelineId = service - .startNewPipeline(new FilledPromiseFromHandleParentJob()); + String pipelineId = service.startNewPipeline(new FilledPromiseFromHandleParentJob()); String value = waitForJobToComplete(pipelineId); assertEquals("1323", value); } @@ -704,13 +700,13 @@ private static class FilledPromiseFromHandleParentJob extends Job0 { @Override public Value run() throws Exception { PromisedValue promise = newPromise(); - waitFor(futureCall(new FillPromiseJob(), immediate("3"), + WaitForSetting wait = waitFor(futureCall(new FillPromiseJob(), immediate("3"), immediate(promise.getHandle()))); - FutureValue child1 = futureCall(new PromiseDelegateJob(), - immediate("1"), immediate(promise.getHandle())); - FutureValue child2 = futureCall(new PromiseDelegateJob(), - immediate("2"), immediate(promise.getHandle())); + FutureValue child1 = futureCall(new PromiseDelegateJob(), immediate("1"), + immediate(promise.getHandle()), wait); + FutureValue child2 = futureCall(new PromiseDelegateJob(), immediate("2"), + immediate(promise.getHandle()), wait); return futureCall(new StringAdderJob(), child1, child2); } @@ -718,20 +714,18 @@ public Value run() throws Exception { public void testPromiseFromNonExistentHandle() throws Exception { PipelineService service = PipelineServiceFactory.newPipelineService(); - String pipelineId = service - .startNewPipeline(new PromiseFromNonExistentHandleParentJob()); - String value = waitForJobToComplete(pipelineId); - assertNull(value); + String pipelineId = service.startNewPipeline(new PromiseFromNonExistentHandleParentJob()); + Boolean value = waitForJobToComplete(pipelineId); + assertTrue(value); } @SuppressWarnings("serial") - private static class PromiseFromNonExistentHandleParentJob extends - Job0 { + private static class PromiseFromNonExistentHandleParentJob extends Job0 { @Override - public Value run() throws Exception { + public Value run() throws Exception { PromisedValue promise = promise("SOME_NON_HANDLE"); - return immediate(promise == null ? null : "This is odd!"); + return immediate(promise == null); } }