Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Rögner <[email protected]>
  • Loading branch information
roegi authored and mchrza committed Nov 11, 2024
1 parent 2ef4aab commit a160159
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.here.xyz.jobs.service.Config;
import com.here.xyz.jobs.steps.resources.Load;
import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -137,23 +139,26 @@ public StepGraph withParallel(boolean parallel) {
*/
@Override
public boolean isEquivalentTo(StepExecution other) {
if (!(other instanceof StepGraph otherGraph) || executions.size() != otherGraph.executions.size())
if (!(other instanceof StepGraph otherGraph)
|| otherGraph.isParallel() != isParallel()
|| executions.size() != otherGraph.executions.size())
return false;

//FIXME: If this StepGraph is a parallel one, the order of the executions should not matter. In that case it simply matters, that for each execution there *exists* an equivalent execution in otherGraph
for (int i = 0; i < executions.size(); i++)
if (!executions.get(i).isEquivalentTo(otherGraph.executions.get(i)))
return false;
return true;
}

/**
*
* NOTE: The "other" StepGraph is the new one
* @param other
* @return
*/
public StepGraph findConnectedEquivalentSubGraph(StepGraph other) {
StepExecution currentNode;
StepExecution currentOtherNode;
StepExecution currentNode = null;
StepExecution currentOtherNode = null;
StepGraph subGraph = new StepGraph()
.withParallel(other.isParallel());

Expand All @@ -177,6 +182,22 @@ else if (!currentNode.isEquivalentTo(currentOtherNode))
return subGraph;
}

private static StepExecution createPseudoNode(StepExecution previousNode) {
if (previousNode instanceof StepGraph previousNodeGraph) {
if (previousNodeGraph.isParallel())
//Create a pseudo node for each previous node in the preceding parallel graph
return new StepGraph()
.withParallel(true)
.withExecutions(previousNodeGraph.executions.stream().map(prevNode -> createPseudoNode(prevNode)).toList());

//Get the last node of the sequential previous graph and create a pseudo node for it
return createPseudoNode(previousNodeGraph.executions.get(previousNodeGraph.executions.size() - 1));
}

Step previousStep = (Step) previousNode;
return new DelegateOutputsPseudoStep(previousStep.getJobId(), previousStep.getId());
}

/**
* Returns `true` if this graph is the empty graph and does not contain any executions.
* @return `true` if this graph does not contain any executions
Expand All @@ -193,4 +214,60 @@ public boolean isEmpty() {
public int size() {
return (int) stepStream().count();
}

/**
* NOTE: This step implementation is a placeholder step, that will be used by the JobExecutor to inject outputs of a formerly run
* job into the StepGraph of this job.
* This step depicts a step of the formerly run job that was found to be a predecessor of the step at the border of the re-usable subgraph
* of the formerly run StepGraph that was cut out because it matched a part of the new job's StepGraph.
* This pseudo step creates a link to the new Job's StepGraph and the old step's outputs.
* That way the succeeding Step(s) of the new StepGraph can access the outputs that have been produced by the old step.
*/
private static class DelegateOutputsPseudoStep extends Step<DelegateOutputsPseudoStep> {
private String delegateJobId; //The old job ID
private String delegateStepId; //The old step ID

DelegateOutputsPseudoStep(String delegateJobId, String delegateStepId) {
this.delegateJobId = delegateJobId;
this.delegateStepId = delegateStepId;
}

@Override
public List<Load> getNeededResources() {
return List.of();
}

@Override
public int getTimeoutSeconds() {
return 0;
}

@Override
public int getEstimatedExecutionSeconds() {
return 0;
}

@Override
public String getDescription() {
return "A pseudo step that just delegates outputs of a Step of another StepGraph into this StepGraph";
}

@Override
public void execute() throws Exception {
throw new IllegalAccessException("This method should never be called");
}

@Override
public void resume() throws Exception {
execute();
}

@Override
public void cancel() throws Exception {}

@Override
public boolean validate() throws ValidationException {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,22 @@ private Future<Boolean> mayExecute(Job job) {
}));
}

/**
* Tries to find other existing jobs, that are completed and that already performed parts of the
* tasks that the provided job would have to perform.
* If such jobs are found, the provided job's StepGraph will re-use these parts of the already executed
* job and will be shrunk accordingly to lower its execution time and save cost.
*
* @param job The new job that is about to be started
* @return An empty future (NOTE: If the job's graph was adjusted / shrunk, it will be also stored)
*/
private static Future<Void> reuseExistingJobIfPossible(Job job) {
return JobConfigClient.getInstance().loadJobs(job.getResourceKey())
.compose(candidates -> shrinkGraphByReusingOtherGraph(job, candidates.stream()
.filter(candidate -> !job.getId().equals(candidate.getId())) //Do not try to compare the job to itself
.map(candidate -> candidate.getSteps())
.max(Comparator.comparingInt(graph -> graph.size())) //Take the candidate with the largest matching subgraph
.map(candidate -> candidate.getSteps().findConnectedEquivalentSubGraph(job.getSteps()))
.filter(candidateGraph -> !candidateGraph.isEmpty())
.max(Comparator.comparingInt(candidateGraph -> candidateGraph.size())) //Take the candidate with the largest matching subgraph
.orElse(null)));
}

Expand Down

0 comments on commit a160159

Please sign in to comment.