Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added method to job to get PromiseValue using handle #14

Merged
merged 11 commits into from
May 12, 2015
10 changes: 10 additions & 0 deletions java/src/main/java/com/google/appengine/tools/pipeline/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.appengine.tools.pipeline.impl.PromisedValueImpl;
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.impl.model.Slot;

import java.io.Serializable;
import java.util.List;
Expand Down Expand Up @@ -385,6 +386,15 @@ public <F> PromisedValue<F> newPromise() {
updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot());
return promisedValue;
}

public <F> PromisedValue<F> promise(String promiseHandle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc.
Also, I wonder what is your use case? PromisedValue only expose its handle, so the only value that I can think
of is if you want to "pass around" the Value to deeply nested children and make their run depends on it.
I am pretty sure this is not going to work correctly if you try to use it with different root jobs (so maybe a more elegant form of passing [not by handle] that would only work between parent to child would be preferred).
In any case, I think you should provide tests for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My use case is described in issue #13 and you are not far off.

I have no intention of using the promised value across root jobs, so I have no problem with adding that as a restriction.

Updates soon...

Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle);
PromisedValueImpl<F> promisedValue = null;
if (slot != null) {
promisedValue = new PromisedValueImpl<F>(slot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use Java 7 based syntax: promisedValue = new PromisedValueImpl<>(slot);

}
return promisedValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it really return a null if not found? In any case, document.
Also, when adding tests please make sure you cover the case of returning a PromissedValue that was already filled (as well as one that wasn't).
You can have more checks here (e.g. verify that Slot belongs to the same rootJob...)

}

/**
* Invoke this method from within the {@code run} method of a <b>generator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,67 @@ public static void acceptPromisedValue(String promiseHandle, Object value)
backEnd.save(updateSpec, generatorJob.getQueueSettings());
}

public static Slot getPromisedValueSlot(String promiseHandle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of it is a copy/paste from acceptPromisedValue. Please refactor to make acceptPromisedValue call this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this method? Can dealing with the exception be part of Job?
This method would mask InterruptedException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are worried about masking the exception I can follow the example of acceptPromisedValue and throw NoSuchObjectException and OrphanedObjectException, but the method itself is required because queryPromisedValueSlot is private and I am not sure I should change it's scope.

Edit (note): if I make it throw the exceptions then I will probably have to throw them in promise() which does not seem to be done by any other job methods.

Slot slot = null;
try {
checkNonEmpty(promiseHandle, "promiseHandle");
Key key = KeyFactory.stringToKey(promiseHandle);
// It is possible, though unlikely, that we might be asked to accept a
// promise before the slot to hold the promise has been saved. We will try 5
// times, sleeping 1, 2, 4, 8 seconds between attempts.
int attempts = 0;
boolean interrupted = false;
try {
while (slot == null) {
attempts++;
try {
slot = backEnd.querySlot(key, false);
} catch (NoSuchObjectException e) {
if (attempts >= 5) {
throw new NoSuchObjectException("There is no promise with handle " + promiseHandle);
}
try {
Thread.sleep((long) Math.pow(2.0, attempts - 1) * 1000L);
} catch (InterruptedException f) {
interrupted = true;
}
}
}
} finally {
// TODO(user): replace with Uninterruptibles#sleepUninterruptibly once we use guava
if (interrupted) {
Thread.currentThread().interrupt();
}
}
Key generatorJobKey = slot.getGeneratorJobKey();
if (null == generatorJobKey) {
throw new RuntimeException(
"Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
}
JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
if (null == generatorJob) {
throw new RuntimeException("Pipeline is fatally corrupted. "
+ "The generator job for a promised value slot was not found: " + generatorJobKey);
}
String childGraphGuid = generatorJob.getChildGraphGuid();
if (null == childGraphGuid) {
// The generator job has not been saved with a childGraphGuid yet. This can happen if the
// promise handle leaked out to an external thread before the job that generated it
// had finished.
throw new NoSuchObjectException(
"The framework is not ready to accept the promised value yet. "
+ "Please try again after the job that generated the promis handle has completed.");
}
if (!childGraphGuid.equals(slot.getGraphGuid())) {
// The slot has been orphaned
throw new OrphanedObjectException(promiseHandle);
}
} catch (Exception e) {
logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e);
}
return slot;
}

/**
* The root job instance used to wrap a user provided root if the user
* provided root job has exceptionHandler specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
*/
public class PromisedValueImpl<E> extends FutureValueImpl<E> implements PromisedValue<E> {

public PromisedValueImpl(Slot slot) {
super(slot);
}

public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) {
super(new Slot(rootJobGuid, generatorJobKey, graphGUID));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace "super" with "this"

}
Expand Down