Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added method to job to get PromiseValue using handle #14

Merged
merged 11 commits into from
May 12, 2015
41 changes: 41 additions & 0 deletions java/src/main/java/com/google/appengine/tools/pipeline/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.appengine.tools.pipeline.impl.PromisedValueImpl;
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.impl.model.Slot;

import java.io.Serializable;
import java.util.List;
Expand Down Expand Up @@ -379,12 +380,52 @@ public <F> PromisedValue<F> newPromise(Class<F> klass) {
return promisedValue;
}

/**
* Invoke this method from within the {@code run} method of a <b>generator
* job</b> in order to declare that some value will be provided asynchronously
* by some external agent.
*
* @param <F> The type of the asynchronously provided value.
* @return A {@code PromisedValue} that represents an empty value slot that
* will be filled at a later time when the external agent invokes
* {@link PipelineService#submitPromisedValue(String, Object)}. This
* may be passed in to further invocations of {@code futureCall()} in
* order to specify a data dependency.
*/
public <F> PromisedValue<F> newPromise() {
PromisedValueImpl<F> promisedValue =
new PromisedValueImpl<>(getPipelineKey(), thisJobRecord.getKey(), currentRunGUID);
updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot());
return promisedValue;
}

/**
* Invoke this method from within the {@code run} method of a <b>generator
* job</b> in order to get some value that has been previously declared and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to get a Promised Value that was created by an ancestor job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is definitely clearer... but... this is the language that the rest of the javadoc uses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with such inconsistency (though maybe we can phrase it in a slightly more consistent way).
Basically, I think we should make it clear that it should not be used across Jobs.

* that will be provided asynchronously by some external agent. This can be
* used to share the same value with child {@link Job}s
*
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, one empty line is enough.

* @param promiseHandle The unique identifier for the {@link PromisedValue}
* obtained during the execution of some job via the method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we say "ancestor" job instead of "some" job? BTW, did you check what happens if called
by the same Job that invoked newPromise()? I would expect that not to work (as Slot is probably not saved until job runs complete...)

* {@link PromisedValue#getHandle()}.
* @return A {@code PromisedValue} that represents an empty value slot that
* will be filled at a later time when the external agent invokes
* {@link PipelineService#submitPromisedValue(String, Object)}. This
* may be passed in to further invocations of {@code futureCall()} in
* order to specify a data dependency.
*
* Note: This method will return <code>null</code> if the slot for the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace "Note:" with having it part of the @return information.

* handle could not be found.
*/
public <F> PromisedValue<F> promise(String promiseHandle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc.
Also, I wonder what is your use case? PromisedValue only expose its handle, so the only value that I can think
of is if you want to "pass around" the Value to deeply nested children and make their run depends on it.
I am pretty sure this is not going to work correctly if you try to use it with different root jobs (so maybe a more elegant form of passing [not by handle] that would only work between parent to child would be preferred).
In any case, I think you should provide tests for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My use case is described in issue #13 and you are not far off.

I have no intention of using the promised value across root jobs, so I have no problem with adding that as a restriction.

Updates soon...

Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle);
PromisedValueImpl<F> promisedValue = null;
if (slot != null) {
promisedValue = new PromisedValueImpl<F>(slot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use Java 7 based syntax: promisedValue = new PromisedValueImpl<>(slot);

}
return promisedValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it really return a null if not found? In any case, document.
Also, when adding tests please make sure you cover the case of returning a PromissedValue that was already filled (as well as one that wasn't).
You can have more checks here (e.g. verify that Slot belongs to the same rootJob...)

}

/**
* Invoke this method from within the {@code run} method of a <b>generator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.OrphanedObjectException;
import com.google.appengine.tools.pipeline.PromisedValue;
import com.google.appengine.tools.pipeline.Value;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd;
import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd;
Expand Down Expand Up @@ -413,11 +414,74 @@ public static void deletePipelineRecords(String pipelineHandle, boolean force, b
backEnd.deletePipeline(key, force, async);
}

/**
* Fills a the {@link PromisedValue}.
* @param promiseHandle the key to the promised value slot.
* @param value the value to fill.
* @throws NoSuchObjectException If there is no Job with the given key.
* @throws OrphanedObjectException If the slot has been orphaned.
*/
public static void acceptPromisedValue(String promiseHandle, Object value)
throws NoSuchObjectException, OrphanedObjectException {
Slot slot = queryPromisedValueSlot(promiseHandle);
JobRecord generatorJob = getGeneratorJob(slot);
UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
backEnd.save(updateSpec, generatorJob.getQueueSettings());
}

private static JobRecord getGeneratorJob(Slot slot)
throws OrphanedObjectException, NoSuchObjectException {
Key generatorJobKey = slot.getGeneratorJobKey();
if (null == generatorJobKey) {
throw new RuntimeException(
"Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
}
JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
if (null == generatorJob) {
throw new RuntimeException("Pipeline is fatally corrupted. "
+ "The generator job for a promised value slot was not found: " + generatorJobKey);
}
String childGraphGuid = generatorJob.getChildGraphGuid();
if (null == childGraphGuid) {
// The generator job has not been saved with a childGraphGuid yet. This can happen if the
// promise handle leaked out to an external thread before the job that generated it
// had finished.
throw new NoSuchObjectException(
"The framework is not ready to accept the promised value yet. "
+ "Please try again after the job that generated the promis handle has completed.");
}
if (!childGraphGuid.equals(slot.getGraphGuid())) {
// The slot has been orphaned
throw new OrphanedObjectException(KeyFactory.keyToString(slot.getKey()));
}

return generatorJob;
}

/**
* Get the {@link Slot} for a promise handle that can be used to construct a
* {@link PromisedValue} for use in child {@link Job}s. It will attempt to lookup
* the key 5 times with an exponential back-off just in case it has been requested
* before the promise has been registered.
* @param promiseHandle Key to find the {@link Slot}
* @return A {@link Slot} if the handle can be resolved otherwise <code>null</code>.
*/
public static Slot getPromisedValueSlot(String promiseHandle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of it is a copy/paste from acceptPromisedValue. Please refactor to make acceptPromisedValue call this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this method? Can dealing with the exception be part of Job?
This method would mask InterruptedException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are worried about masking the exception I can follow the example of acceptPromisedValue and throw NoSuchObjectException and OrphanedObjectException, but the method itself is required because queryPromisedValueSlot is private and I am not sure I should change it's scope.

Edit (note): if I make it throw the exceptions then I will probably have to throw them in promise() which does not seem to be done by any other job methods.

Slot slot = null;
try {
slot = queryPromisedValueSlot(promiseHandle);
} catch (Exception e) {
logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e);
}
return slot;
}

private static Slot queryPromisedValueSlot(String promiseHandle)
throws NoSuchObjectException {
Slot slot = null;
checkNonEmpty(promiseHandle, "promiseHandle");
Key key = KeyFactory.stringToKey(promiseHandle);
Slot slot = null;
// It is possible, though unlikely, that we might be asked to accept a
// promise before the slot to hold the promise has been saved. We will try 5
// times, sleeping 1, 2, 4, 8 seconds between attempts.
Expand Down Expand Up @@ -445,34 +509,9 @@ public static void acceptPromisedValue(String promiseHandle, Object value)
Thread.currentThread().interrupt();
}
}
Key generatorJobKey = slot.getGeneratorJobKey();
if (null == generatorJobKey) {
throw new RuntimeException(
"Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
}
JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
if (null == generatorJob) {
throw new RuntimeException("Pipeline is fatally corrupted. "
+ "The generator job for a promised value slot was not found: " + generatorJobKey);
}
String childGraphGuid = generatorJob.getChildGraphGuid();
if (null == childGraphGuid) {
// The generator job has not been saved with a childGraphGuid yet. This can happen if the
// promise handle leaked out to an external thread before the job that generated it
// had finished.
throw new NoSuchObjectException(
"The framework is not ready to accept the promised value yet. "
+ "Please try again after the job that generated the promis handle has completed.");
}
if (!childGraphGuid.equals(slot.getGraphGuid())) {
// The slot has been orphaned
throw new OrphanedObjectException(promiseHandle);
}
UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
backEnd.save(updateSpec, generatorJob.getQueueSettings());
return slot;
}

/**
* The root job instance used to wrap a user provided root if the user
* provided root job has exceptionHandler specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
*/
public class PromisedValueImpl<E> extends FutureValueImpl<E> implements PromisedValue<E> {

public PromisedValueImpl(Slot slot) {
super(slot);
}

public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) {
super(new Slot(rootJobGuid, generatorJobKey, graphGUID));
this(new Slot(rootJobGuid, generatorJobKey, graphGUID));
}

@Override
Expand Down