From a1601595fefa721f3af8322a89c5f7f9db0f0b5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20R=C3=B6gner?= Date: Sat, 26 Oct 2024 22:16:08 +0200 Subject: [PATCH] WIP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Benjamin Rögner --- .../com/here/xyz/jobs/steps/StepGraph.java | 85 ++++++++++++++++++- .../xyz/jobs/steps/execution/JobExecutor.java | 14 ++- 2 files changed, 93 insertions(+), 6 deletions(-) diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java index fcd0a54cd..d1d2bb0fc 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeName; import com.here.xyz.jobs.service.Config; +import com.here.xyz.jobs.steps.resources.Load; +import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -137,9 +139,12 @@ public StepGraph withParallel(boolean parallel) { */ @Override public boolean isEquivalentTo(StepExecution other) { - if (!(other instanceof StepGraph otherGraph) || executions.size() != otherGraph.executions.size()) + if (!(other instanceof StepGraph otherGraph) + || otherGraph.isParallel() != isParallel() + || executions.size() != otherGraph.executions.size()) return false; + //FIXME: If this StepGraph is a parallel one, the order of the executions should not matter. In that case it simply matters, that for each execution there *exists* an equivalent execution in otherGraph for (int i = 0; i < executions.size(); i++) if (!executions.get(i).isEquivalentTo(otherGraph.executions.get(i))) return false; @@ -147,13 +152,13 @@ public boolean isEquivalentTo(StepExecution other) { } /** - * + * NOTE: The "other" StepGraph is the new one * @param other * @return */ public StepGraph findConnectedEquivalentSubGraph(StepGraph other) { - StepExecution currentNode; - StepExecution currentOtherNode; + StepExecution currentNode = null; + StepExecution currentOtherNode = null; StepGraph subGraph = new StepGraph() .withParallel(other.isParallel()); @@ -177,6 +182,22 @@ else if (!currentNode.isEquivalentTo(currentOtherNode)) return subGraph; } + private static StepExecution createPseudoNode(StepExecution previousNode) { + if (previousNode instanceof StepGraph previousNodeGraph) { + if (previousNodeGraph.isParallel()) + //Create a pseudo node for each previous node in the preceding parallel graph + return new StepGraph() + .withParallel(true) + .withExecutions(previousNodeGraph.executions.stream().map(prevNode -> createPseudoNode(prevNode)).toList()); + + //Get the last node of the sequential previous graph and create a pseudo node for it + return createPseudoNode(previousNodeGraph.executions.get(previousNodeGraph.executions.size() - 1)); + } + + Step previousStep = (Step) previousNode; + return new DelegateOutputsPseudoStep(previousStep.getJobId(), previousStep.getId()); + } + /** * Returns `true` if this graph is the empty graph and does not contain any executions. * @return `true` if this graph does not contain any executions @@ -193,4 +214,60 @@ public boolean isEmpty() { public int size() { return (int) stepStream().count(); } + + /** + * NOTE: This step implementation is a placeholder step, that will be used by the JobExecutor to inject outputs of a formerly run + * job into the StepGraph of this job. + * This step depicts a step of the formerly run job that was found to be a predecessor of the step at the border of the re-usable subgraph + * of the formerly run StepGraph that was cut out because it matched a part of the new job's StepGraph. + * This pseudo step creates a link to the new Job's StepGraph and the old step's outputs. + * That way the succeeding Step(s) of the new StepGraph can access the outputs that have been produced by the old step. + */ + private static class DelegateOutputsPseudoStep extends Step { + private String delegateJobId; //The old job ID + private String delegateStepId; //The old step ID + + DelegateOutputsPseudoStep(String delegateJobId, String delegateStepId) { + this.delegateJobId = delegateJobId; + this.delegateStepId = delegateStepId; + } + + @Override + public List getNeededResources() { + return List.of(); + } + + @Override + public int getTimeoutSeconds() { + return 0; + } + + @Override + public int getEstimatedExecutionSeconds() { + return 0; + } + + @Override + public String getDescription() { + return "A pseudo step that just delegates outputs of a Step of another StepGraph into this StepGraph"; + } + + @Override + public void execute() throws Exception { + throw new IllegalAccessException("This method should never be called"); + } + + @Override + public void resume() throws Exception { + execute(); + } + + @Override + public void cancel() throws Exception {} + + @Override + public boolean validate() throws ValidationException { + return true; + } + } } diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/JobExecutor.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/JobExecutor.java index b7a916713..ee5649ad7 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/JobExecutor.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/JobExecutor.java @@ -298,12 +298,22 @@ private Future mayExecute(Job job) { })); } + /** + * Tries to find other existing jobs, that are completed and that already performed parts of the + * tasks that the provided job would have to perform. + * If such jobs are found, the provided job's StepGraph will re-use these parts of the already executed + * job and will be shrunk accordingly to lower its execution time and save cost. + * + * @param job The new job that is about to be started + * @return An empty future (NOTE: If the job's graph was adjusted / shrunk, it will be also stored) + */ private static Future reuseExistingJobIfPossible(Job job) { return JobConfigClient.getInstance().loadJobs(job.getResourceKey()) .compose(candidates -> shrinkGraphByReusingOtherGraph(job, candidates.stream() .filter(candidate -> !job.getId().equals(candidate.getId())) //Do not try to compare the job to itself - .map(candidate -> candidate.getSteps()) - .max(Comparator.comparingInt(graph -> graph.size())) //Take the candidate with the largest matching subgraph + .map(candidate -> candidate.getSteps().findConnectedEquivalentSubGraph(job.getSteps())) + .filter(candidateGraph -> !candidateGraph.isEmpty()) + .max(Comparator.comparingInt(candidateGraph -> candidateGraph.size())) //Take the candidate with the largest matching subgraph .orElse(null))); }