diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java index 3e6ee155..77f074a8 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java @@ -20,6 +20,7 @@ import com.google.appengine.tools.pipeline.impl.PromisedValueImpl; import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec; import com.google.appengine.tools.pipeline.impl.model.JobRecord; +import com.google.appengine.tools.pipeline.impl.model.Slot; import java.io.Serializable; import java.util.List; @@ -379,12 +380,49 @@ public PromisedValue newPromise(Class klass) { return promisedValue; } + /** + * Invoke this method from within the {@code run} method of a generator + * job in order to declare that some value will be provided asynchronously + * by some external agent. + * + * @param The type of the asynchronously provided value. + * @return A {@code PromisedValue} that represents an empty value slot that + * will be filled at a later time when the external agent invokes + * {@link PipelineService#submitPromisedValue(String, Object)}. This + * may be passed in to further invocations of {@code futureCall()} in + * order to specify a data dependency. + */ public PromisedValue newPromise() { PromisedValueImpl promisedValue = new PromisedValueImpl<>(getPipelineKey(), thisJobRecord.getKey(), currentRunGUID); updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot()); return promisedValue; } + + /** + * Invoke this method from within the {@code run} method of a generator + * job in order to get a promised value that was created by an ancestor + * job that will be provided asynchronously by some external agent. This can + * be used to share the same value with child {@link Job}s + * + * @param promiseHandle The unique identifier for the {@link PromisedValue} + * obtained during the execution of some job via the method + * {@link PromisedValue#getHandle()}. + * @return A {@code PromisedValue} that represents an empty value slot that + * will be filled at a later time when the external agent invokes + * {@link PipelineService#submitPromisedValue(String, Object)}. This + * may be passed in to further invocations of {@code futureCall()} in + * order to specify a data dependency. This method will return + * null if the slot for the handle could not be found. + */ + public PromisedValue promise(String promiseHandle) { + Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle); + PromisedValueImpl promisedValue = null; + if (slot != null) { + promisedValue = new PromisedValueImpl<>(slot); + } + return promisedValue; + } /** * Invoke this method from within the {@code run} method of a generator diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java index 4e80f575..758aed4a 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java @@ -24,6 +24,7 @@ import com.google.appengine.tools.pipeline.JobSetting; import com.google.appengine.tools.pipeline.NoSuchObjectException; import com.google.appengine.tools.pipeline.OrphanedObjectException; +import com.google.appengine.tools.pipeline.PromisedValue; import com.google.appengine.tools.pipeline.Value; import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd; import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd; @@ -413,11 +414,74 @@ public static void deletePipelineRecords(String pipelineHandle, boolean force, b backEnd.deletePipeline(key, force, async); } + /** + * Fills a the {@link PromisedValue}. + * @param promiseHandle the key to the promised value slot. + * @param value the value to fill. + * @throws NoSuchObjectException If there is no Job with the given key. + * @throws OrphanedObjectException If the slot has been orphaned. + */ public static void acceptPromisedValue(String promiseHandle, Object value) throws NoSuchObjectException, OrphanedObjectException { + Slot slot = queryPromisedValueSlot(promiseHandle); + JobRecord generatorJob = getGeneratorJob(slot); + UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey()); + registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value); + backEnd.save(updateSpec, generatorJob.getQueueSettings()); + } + + private static JobRecord getGeneratorJob(Slot slot) + throws OrphanedObjectException, NoSuchObjectException { + Key generatorJobKey = slot.getGeneratorJobKey(); + if (null == generatorJobKey) { + throw new RuntimeException( + "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot); + } + JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE); + if (null == generatorJob) { + throw new RuntimeException("Pipeline is fatally corrupted. " + + "The generator job for a promised value slot was not found: " + generatorJobKey); + } + String childGraphGuid = generatorJob.getChildGraphGuid(); + if (null == childGraphGuid) { + // The generator job has not been saved with a childGraphGuid yet. This can happen if the + // promise handle leaked out to an external thread before the job that generated it + // had finished. + throw new NoSuchObjectException( + "The framework is not ready to accept the promised value yet. " + + "Please try again after the job that generated the promis handle has completed."); + } + if (!childGraphGuid.equals(slot.getGraphGuid())) { + // The slot has been orphaned + throw new OrphanedObjectException(KeyFactory.keyToString(slot.getKey())); + } + + return generatorJob; + } + + /** + * Get the {@link Slot} for a promise handle that can be used to construct a + * {@link PromisedValue} for use in child {@link Job}s. It will attempt to lookup + * the key 5 times with an exponential back-off just in case it has been requested + * before the promise has been registered. + * @param promiseHandle Key to find the {@link Slot} + * @return A {@link Slot} if the handle can be resolved otherwise null. + */ + public static Slot getPromisedValueSlot(String promiseHandle) { + Slot slot = null; + try { + slot = queryPromisedValueSlot(promiseHandle); + } catch (Exception e) { + logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e); + } + return slot; + } + + private static Slot queryPromisedValueSlot(String promiseHandle) + throws NoSuchObjectException { + Slot slot = null; checkNonEmpty(promiseHandle, "promiseHandle"); Key key = KeyFactory.stringToKey(promiseHandle); - Slot slot = null; // It is possible, though unlikely, that we might be asked to accept a // promise before the slot to hold the promise has been saved. We will try 5 // times, sleeping 1, 2, 4, 8 seconds between attempts. @@ -445,34 +509,9 @@ public static void acceptPromisedValue(String promiseHandle, Object value) Thread.currentThread().interrupt(); } } - Key generatorJobKey = slot.getGeneratorJobKey(); - if (null == generatorJobKey) { - throw new RuntimeException( - "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot); - } - JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE); - if (null == generatorJob) { - throw new RuntimeException("Pipeline is fatally corrupted. " - + "The generator job for a promised value slot was not found: " + generatorJobKey); - } - String childGraphGuid = generatorJob.getChildGraphGuid(); - if (null == childGraphGuid) { - // The generator job has not been saved with a childGraphGuid yet. This can happen if the - // promise handle leaked out to an external thread before the job that generated it - // had finished. - throw new NoSuchObjectException( - "The framework is not ready to accept the promised value yet. " - + "Please try again after the job that generated the promis handle has completed."); - } - if (!childGraphGuid.equals(slot.getGraphGuid())) { - // The slot has been orphaned - throw new OrphanedObjectException(promiseHandle); - } - UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey()); - registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value); - backEnd.save(updateSpec, generatorJob.getQueueSettings()); + return slot; } - + /** * The root job instance used to wrap a user provided root if the user * provided root job has exceptionHandler specified. diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java index 06977f21..e465d7bb 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java @@ -29,8 +29,12 @@ */ public class PromisedValueImpl extends FutureValueImpl implements PromisedValue { + public PromisedValueImpl(Slot slot) { + super(slot); + } + public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) { - super(new Slot(rootJobGuid, generatorJobKey, graphGUID)); + this(new Slot(rootJobGuid, generatorJobKey, graphGUID)); } @Override diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java index 2e6a19d8..4ce9ec38 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java @@ -16,6 +16,7 @@ import com.google.appengine.tools.pipeline.JobInfo.State; import com.google.appengine.tools.pipeline.JobSetting.StatusConsoleUrl; +import com.google.appengine.tools.pipeline.JobSetting.WaitForSetting; import com.google.appengine.tools.pipeline.impl.PipelineManager; import com.google.appengine.tools.pipeline.impl.model.JobRecord; import com.google.appengine.tools.pipeline.impl.model.PipelineObjects; @@ -644,4 +645,88 @@ public Value run(long[] bytes) { return immediate(bytes.length); } } + + public void testUnfilledPromiseFromHandle() throws Exception { + PipelineService service = PipelineServiceFactory.newPipelineService(); + String pipelineId = service.startNewPipeline(new UnfilledPromiseFromHandleParentJob()); + String value = waitForJobToComplete(pipelineId); + assertEquals("1323", value); + } + + @SuppressWarnings("serial") + private static class UnfilledPromiseFromHandleParentJob extends Job0 { + + @Override + public Value run() throws Exception { + PromisedValue promise = newPromise(); + FutureValue child1 = futureCall(new PromiseDelegateJob(), immediate("1"), + immediate(promise.getHandle())); + FutureValue child2 = futureCall(new PromiseDelegateJob(), immediate("2"), + immediate(promise.getHandle())); + futureCall(new FillPromiseJob(), immediate("3"), immediate(promise.getHandle())); + FutureValue child4 = futureCall(new StringAdderJob(), child1, child2); + return child4; + } + } + + @SuppressWarnings("serial") + private static class PromiseDelegateJob extends Job2 { + @Override + public Value run(String value, String handle) { + PromisedValue promise = promise(handle); + FutureValue added = futureCall(new StringAdderJob(), immediate(value), promise); + return added; + } + } + + @SuppressWarnings("serial") + private static class StringAdderJob extends Job2 { + @Override + public Value run(String value1, String value2) { + return immediate(value1 + value2); + } + } + + public void testFilledPromiseFromHandle() throws Exception { + PipelineService service = PipelineServiceFactory.newPipelineService(); + String pipelineId = service.startNewPipeline(new FilledPromiseFromHandleParentJob()); + String value = waitForJobToComplete(pipelineId); + assertEquals("1323", value); + } + + @SuppressWarnings("serial") + private static class FilledPromiseFromHandleParentJob extends Job0 { + + @Override + public Value run() throws Exception { + PromisedValue promise = newPromise(); + WaitForSetting wait = waitFor(futureCall(new FillPromiseJob(), immediate("3"), + immediate(promise.getHandle()))); + + FutureValue child1 = futureCall(new PromiseDelegateJob(), immediate("1"), + immediate(promise.getHandle()), wait); + FutureValue child2 = futureCall(new PromiseDelegateJob(), immediate("2"), + immediate(promise.getHandle()), wait); + + return futureCall(new StringAdderJob(), child1, child2); + } + } + + public void testPromiseFromNonExistentHandle() throws Exception { + PipelineService service = PipelineServiceFactory.newPipelineService(); + String pipelineId = service.startNewPipeline(new PromiseFromNonExistentHandleParentJob()); + Boolean value = waitForJobToComplete(pipelineId); + assertTrue(value); + } + + @SuppressWarnings("serial") + private static class PromiseFromNonExistentHandleParentJob extends Job0 { + + @Override + public Value run() throws Exception { + PromisedValue promise = promise("SOME_NON_HANDLE"); + return immediate(promise == null); + } + } + }