-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added method to job to get PromiseValue using handle #14
Changes from 4 commits
b0e27bf
9b43be5
202d8f2
c9ad75f
f6821af
99db1fc
eba5642
9e418c2
32321eb
79c02c3
7349c00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import com.google.appengine.tools.pipeline.impl.PromisedValueImpl; | ||
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec; | ||
import com.google.appengine.tools.pipeline.impl.model.JobRecord; | ||
import com.google.appengine.tools.pipeline.impl.model.Slot; | ||
|
||
import java.io.Serializable; | ||
import java.util.List; | ||
|
@@ -379,12 +380,52 @@ public <F> PromisedValue<F> newPromise(Class<F> klass) { | |
return promisedValue; | ||
} | ||
|
||
/** | ||
* Invoke this method from within the {@code run} method of a <b>generator | ||
* job</b> in order to declare that some value will be provided asynchronously | ||
* by some external agent. | ||
* | ||
* @param <F> The type of the asynchronously provided value. | ||
* @return A {@code PromisedValue} that represents an empty value slot that | ||
* will be filled at a later time when the external agent invokes | ||
* {@link PipelineService#submitPromisedValue(String, Object)}. This | ||
* may be passed in to further invocations of {@code futureCall()} in | ||
* order to specify a data dependency. | ||
*/ | ||
public <F> PromisedValue<F> newPromise() { | ||
PromisedValueImpl<F> promisedValue = | ||
new PromisedValueImpl<>(getPipelineKey(), thisJobRecord.getKey(), currentRunGUID); | ||
updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot()); | ||
return promisedValue; | ||
} | ||
|
||
/** | ||
* Invoke this method from within the {@code run} method of a <b>generator | ||
* job</b> in order to get some value that has been previously declared and | ||
* that will be provided asynchronously by some external agent. This can be | ||
* used to share the same value with child {@link Job}s | ||
* | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, one empty line is enough. |
||
* @param promiseHandle The unique identifier for the {@link PromisedValue} | ||
* obtained during the execution of some job via the method | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we say "ancestor" job instead of "some" job? BTW, did you check what happens if called |
||
* {@link PromisedValue#getHandle()}. | ||
* @return A {@code PromisedValue} that represents an empty value slot that | ||
* will be filled at a later time when the external agent invokes | ||
* {@link PipelineService#submitPromisedValue(String, Object)}. This | ||
* may be passed in to further invocations of {@code futureCall()} in | ||
* order to specify a data dependency. | ||
* | ||
* Note: This method will return <code>null</code> if the slot for the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace "Note:" with having it part of the @return information. |
||
* handle could not be found. | ||
*/ | ||
public <F> PromisedValue<F> promise(String promiseHandle) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. javadoc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My use case is described in issue #13 and you are not far off. I have no intention of using the promised value across root jobs, so I have no problem with adding that as a restriction. Updates soon... |
||
Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle); | ||
PromisedValueImpl<F> promisedValue = null; | ||
if (slot != null) { | ||
promisedValue = new PromisedValueImpl<F>(slot); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could use Java 7 based syntax: promisedValue = new PromisedValueImpl<>(slot); |
||
} | ||
return promisedValue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should it really return a null if not found? In any case, document. |
||
} | ||
|
||
/** | ||
* Invoke this method from within the {@code run} method of a <b>generator | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import com.google.appengine.tools.pipeline.JobSetting; | ||
import com.google.appengine.tools.pipeline.NoSuchObjectException; | ||
import com.google.appengine.tools.pipeline.OrphanedObjectException; | ||
import com.google.appengine.tools.pipeline.PromisedValue; | ||
import com.google.appengine.tools.pipeline.Value; | ||
import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd; | ||
import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd; | ||
|
@@ -413,11 +414,74 @@ public static void deletePipelineRecords(String pipelineHandle, boolean force, b | |
backEnd.deletePipeline(key, force, async); | ||
} | ||
|
||
/** | ||
* Fills a the {@link PromisedValue}. | ||
* @param promiseHandle the key to the promised value slot. | ||
* @param value the value to fill. | ||
* @throws NoSuchObjectException If there is no Job with the given key. | ||
* @throws OrphanedObjectException If the slot has been orphaned. | ||
*/ | ||
public static void acceptPromisedValue(String promiseHandle, Object value) | ||
throws NoSuchObjectException, OrphanedObjectException { | ||
Slot slot = queryPromisedValueSlot(promiseHandle); | ||
JobRecord generatorJob = getGeneratorJob(slot); | ||
UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey()); | ||
registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value); | ||
backEnd.save(updateSpec, generatorJob.getQueueSettings()); | ||
} | ||
|
||
private static JobRecord getGeneratorJob(Slot slot) | ||
throws OrphanedObjectException, NoSuchObjectException { | ||
Key generatorJobKey = slot.getGeneratorJobKey(); | ||
if (null == generatorJobKey) { | ||
throw new RuntimeException( | ||
"Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot); | ||
} | ||
JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE); | ||
if (null == generatorJob) { | ||
throw new RuntimeException("Pipeline is fatally corrupted. " | ||
+ "The generator job for a promised value slot was not found: " + generatorJobKey); | ||
} | ||
String childGraphGuid = generatorJob.getChildGraphGuid(); | ||
if (null == childGraphGuid) { | ||
// The generator job has not been saved with a childGraphGuid yet. This can happen if the | ||
// promise handle leaked out to an external thread before the job that generated it | ||
// had finished. | ||
throw new NoSuchObjectException( | ||
"The framework is not ready to accept the promised value yet. " | ||
+ "Please try again after the job that generated the promis handle has completed."); | ||
} | ||
if (!childGraphGuid.equals(slot.getGraphGuid())) { | ||
// The slot has been orphaned | ||
throw new OrphanedObjectException(KeyFactory.keyToString(slot.getKey())); | ||
} | ||
|
||
return generatorJob; | ||
} | ||
|
||
/** | ||
* Get the {@link Slot} for a promise handle that can be used to construct a | ||
* {@link PromisedValue} for use in child {@link Job}s. It will attempt to lookup | ||
* the key 5 times with an exponential back-off just in case it has been requested | ||
* before the promise has been registered. | ||
* @param promiseHandle Key to find the {@link Slot} | ||
* @return A {@link Slot} if the handle can be resolved otherwise <code>null</code>. | ||
*/ | ||
public static Slot getPromisedValueSlot(String promiseHandle) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of it is a copy/paste from acceptPromisedValue. Please refactor to make acceptPromisedValue call this method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this method? Can dealing with the exception be part of Job? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are worried about masking the exception I can follow the example of acceptPromisedValue and throw NoSuchObjectException and OrphanedObjectException, but the method itself is required because queryPromisedValueSlot is private and I am not sure I should change it's scope. Edit (note): if I make it throw the exceptions then I will probably have to throw them in promise() which does not seem to be done by any other job methods. |
||
Slot slot = null; | ||
try { | ||
slot = queryPromisedValueSlot(promiseHandle); | ||
} catch (Exception e) { | ||
logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e); | ||
} | ||
return slot; | ||
} | ||
|
||
private static Slot queryPromisedValueSlot(String promiseHandle) | ||
throws NoSuchObjectException { | ||
Slot slot = null; | ||
checkNonEmpty(promiseHandle, "promiseHandle"); | ||
Key key = KeyFactory.stringToKey(promiseHandle); | ||
Slot slot = null; | ||
// It is possible, though unlikely, that we might be asked to accept a | ||
// promise before the slot to hold the promise has been saved. We will try 5 | ||
// times, sleeping 1, 2, 4, 8 seconds between attempts. | ||
|
@@ -445,34 +509,9 @@ public static void acceptPromisedValue(String promiseHandle, Object value) | |
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
Key generatorJobKey = slot.getGeneratorJobKey(); | ||
if (null == generatorJobKey) { | ||
throw new RuntimeException( | ||
"Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot); | ||
} | ||
JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE); | ||
if (null == generatorJob) { | ||
throw new RuntimeException("Pipeline is fatally corrupted. " | ||
+ "The generator job for a promised value slot was not found: " + generatorJobKey); | ||
} | ||
String childGraphGuid = generatorJob.getChildGraphGuid(); | ||
if (null == childGraphGuid) { | ||
// The generator job has not been saved with a childGraphGuid yet. This can happen if the | ||
// promise handle leaked out to an external thread before the job that generated it | ||
// had finished. | ||
throw new NoSuchObjectException( | ||
"The framework is not ready to accept the promised value yet. " | ||
+ "Please try again after the job that generated the promis handle has completed."); | ||
} | ||
if (!childGraphGuid.equals(slot.getGraphGuid())) { | ||
// The slot has been orphaned | ||
throw new OrphanedObjectException(promiseHandle); | ||
} | ||
UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey()); | ||
registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value); | ||
backEnd.save(updateSpec, generatorJob.getQueueSettings()); | ||
return slot; | ||
} | ||
|
||
/** | ||
* The root job instance used to wrap a user provided root if the user | ||
* provided root job has exceptionHandler specified. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to get a Promised Value that was created by an ancestor job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is definitely clearer... but... this is the language that the rest of the javadoc uses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with such inconsistency (though maybe we can phrase it in a slightly more consistent way).
Basically, I think we should make it clear that it should not be used across Jobs.