-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added method to job to get PromiseValue using handle #14
Changes from 1 commit
b0e27bf
9b43be5
202d8f2
c9ad75f
f6821af
99db1fc
eba5642
9e418c2
32321eb
79c02c3
7349c00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import com.google.appengine.tools.pipeline.impl.PromisedValueImpl; | ||
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec; | ||
import com.google.appengine.tools.pipeline.impl.model.JobRecord; | ||
import com.google.appengine.tools.pipeline.impl.model.Slot; | ||
|
||
import java.io.Serializable; | ||
import java.util.List; | ||
|
@@ -385,6 +386,15 @@ public <F> PromisedValue<F> newPromise() { | |
updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot()); | ||
return promisedValue; | ||
} | ||
|
||
public <F> PromisedValue<F> promise(String promiseHandle) { | ||
Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle); | ||
PromisedValueImpl<F> promisedValue = null; | ||
if (slot != null) { | ||
promisedValue = new PromisedValueImpl<F>(slot); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could use Java 7 based syntax: promisedValue = new PromisedValueImpl<>(slot); |
||
} | ||
return promisedValue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should it really return a null if not found? In any case, document. |
||
} | ||
|
||
/** | ||
* Invoke this method from within the {@code run} method of a <b>generator | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -473,6 +473,67 @@ public static void acceptPromisedValue(String promiseHandle, Object value) | |
backEnd.save(updateSpec, generatorJob.getQueueSettings()); | ||
} | ||
|
||
public static Slot getPromisedValueSlot(String promiseHandle) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of it is a copy/paste from acceptPromisedValue. Please refactor to make acceptPromisedValue call this method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this method? Can dealing with the exception be part of Job? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are worried about masking the exception I can follow the example of acceptPromisedValue and throw NoSuchObjectException and OrphanedObjectException, but the method itself is required because queryPromisedValueSlot is private and I am not sure I should change it's scope. Edit (note): if I make it throw the exceptions then I will probably have to throw them in promise() which does not seem to be done by any other job methods. |
||
Slot slot = null; | ||
try { | ||
checkNonEmpty(promiseHandle, "promiseHandle"); | ||
Key key = KeyFactory.stringToKey(promiseHandle); | ||
// It is possible, though unlikely, that we might be asked to accept a | ||
// promise before the slot to hold the promise has been saved. We will try 5 | ||
// times, sleeping 1, 2, 4, 8 seconds between attempts. | ||
int attempts = 0; | ||
boolean interrupted = false; | ||
try { | ||
while (slot == null) { | ||
attempts++; | ||
try { | ||
slot = backEnd.querySlot(key, false); | ||
} catch (NoSuchObjectException e) { | ||
if (attempts >= 5) { | ||
throw new NoSuchObjectException("There is no promise with handle " + promiseHandle); | ||
} | ||
try { | ||
Thread.sleep((long) Math.pow(2.0, attempts - 1) * 1000L); | ||
} catch (InterruptedException f) { | ||
interrupted = true; | ||
} | ||
} | ||
} | ||
} finally { | ||
// TODO(user): replace with Uninterruptibles#sleepUninterruptibly once we use guava | ||
if (interrupted) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
Key generatorJobKey = slot.getGeneratorJobKey(); | ||
if (null == generatorJobKey) { | ||
throw new RuntimeException( | ||
"Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot); | ||
} | ||
JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE); | ||
if (null == generatorJob) { | ||
throw new RuntimeException("Pipeline is fatally corrupted. " | ||
+ "The generator job for a promised value slot was not found: " + generatorJobKey); | ||
} | ||
String childGraphGuid = generatorJob.getChildGraphGuid(); | ||
if (null == childGraphGuid) { | ||
// The generator job has not been saved with a childGraphGuid yet. This can happen if the | ||
// promise handle leaked out to an external thread before the job that generated it | ||
// had finished. | ||
throw new NoSuchObjectException( | ||
"The framework is not ready to accept the promised value yet. " | ||
+ "Please try again after the job that generated the promis handle has completed."); | ||
} | ||
if (!childGraphGuid.equals(slot.getGraphGuid())) { | ||
// The slot has been orphaned | ||
throw new OrphanedObjectException(promiseHandle); | ||
} | ||
} catch (Exception e) { | ||
logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e); | ||
} | ||
return slot; | ||
} | ||
|
||
/** | ||
* The root job instance used to wrap a user provided root if the user | ||
* provided root job has exceptionHandler specified. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,10 @@ | |
*/ | ||
public class PromisedValueImpl<E> extends FutureValueImpl<E> implements PromisedValue<E> { | ||
|
||
public PromisedValueImpl(Slot slot) { | ||
super(slot); | ||
} | ||
|
||
public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) { | ||
super(new Slot(rootJobGuid, generatorJobKey, graphGUID)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace "super" with "this" |
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc.
Also, I wonder what is your use case? PromisedValue only expose its handle, so the only value that I can think
of is if you want to "pass around" the Value to deeply nested children and make their run depends on it.
I am pretty sure this is not going to work correctly if you try to use it with different root jobs (so maybe a more elegant form of passing [not by handle] that would only work between parent to child would be preferred).
In any case, I think you should provide tests for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My use case is described in issue #13 and you are not far off.
I have no intention of using the promised value across root jobs, so I have no problem with adding that as a restriction.
Updates soon...