Skip to content

Commit

Permalink
feat(core): restarting Subflow
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 7, 2025
1 parent a5469c3 commit 1e36d1e
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 24 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public record Label(@NotNull String key, @NotNull String value) {
public static final String USERNAME = SYSTEM_PREFIX + "username";
public static final String APP = SYSTEM_PREFIX + "app";
public static final String READ_ONLY = SYSTEM_PREFIX + "readOnly";
public static final String RESTARTED = SYSTEM_PREFIX + "restarted";

/**
* Static helper method for converting a map to a list of labels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public SubflowId subflowId() {
return new SubflowId(namespace, flowId, subflowTask.subflowId().revision());
}

@Override
public RestartBehavior getRestartBehavior() {
return subflowTask.getRestartBehavior();
}

@Override
public String getId() {
return ((TaskInterface) subflowTask).getId();
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/kestra/core/models/tasks/ExecutableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runCont
*/
SubflowId subflowId();

/**
* Returns the restart behavior of subflow executions.
*/
RestartBehavior getRestartBehavior();

record SubflowId(String namespace, String flowId, Optional<Integer> revision) {
public String flowUid() {
// as the Flow task can only be used in the same tenant we can hardcode null here
Expand All @@ -54,4 +59,9 @@ public String flowUidWithoutRevision() {
return Flow.uidWithoutRevision(null, this.namespace, this.flowId);
}
}

enum RestartBehavior {
NEW_EXECUTION,
RETRY_FAILED
}
}
75 changes: 67 additions & 8 deletions core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.Storage;
import io.kestra.core.utils.MapUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.stream.Streams;

Expand Down Expand Up @@ -53,7 +56,7 @@ public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskru
.build();
}

public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowExecution(
public static <T extends Task & ExecutableTask<?>> Optional<SubflowExecution<?>> subflowExecution(
RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Execution currentExecution,
Expand All @@ -65,6 +68,50 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
boolean inheritLabels,
Property<ZonedDateTime> scheduleDate
) throws IllegalVariableEvaluationException {
// If we are in a flow that is restarted, we search for existing run of the task to restart them
if (currentExecution.getLabels().contains(new Label(Label.RESTARTED, "true")) && currentTask.getRestartBehavior() == ExecutableTask.RestartBehavior.RETRY_FAILED) {
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);

Optional<Execution> existingSubflowExecution = Optional.empty();
if (currentTaskRun.getOutputs() != null && currentTaskRun.getOutputs().containsKey("executionId")) {
// we know which execution to restart; this should be the case for Subflow tasks
existingSubflowExecution = executionRepository.findById(currentExecution.getTenantId(), (String) currentTaskRun.getOutputs().get("executionId"));
}

if (existingSubflowExecution.isEmpty()) {
// otherwise, we try to find the correct one; this should be the case for ForEachItem tasks
List<Execution> childExecutions = executionRepository.findAllByTriggerExecutionId(currentExecution.getTenantId(), currentExecution.getId())
.filter(e -> e.getNamespace().equals(currentTask.subflowId().namespace()) && e.getFlowId().equals(currentTask.subflowId().flowId()) && e.getTrigger().getId().equals(currentTask.getId()))
.filter(e -> Objects.equals(e.getTrigger().getVariables().get("taskRunId"), currentTaskRun.getId()) && Objects.equals(e.getTrigger().getVariables().get("taskRunValue"), currentTaskRun.getValue()) && Objects.equals(e.getTrigger().getVariables().get("taskRunIteration"), currentTaskRun.getIteration()))
.collectList()
.block();

if (childExecutions != null && childExecutions.size() == 1) {
// if there are more than one, we ignore the results and create a new one
existingSubflowExecution = Optional.of(childExecutions.getFirst());
}
}

if (existingSubflowExecution.isPresent()) {
Execution subflowExecution = existingSubflowExecution.get();
if (!subflowExecution.getState().isFailed()) {
// don't restart it as it's terminated successfully
return Optional.empty();
}
ExecutionService executionService = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionService.class);
try {
Execution restarted = executionService.restart(subflowExecution, null);
return Optional.of(SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
.execution(restarted)
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
Expand Down Expand Up @@ -93,12 +140,19 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
labels.forEach(throwConsumer(label -> newLabels.add(new Label(runContext.render(label.key()), runContext.render(label.value())))));
}

Map<String, Object> variables = ImmutableMap.of(
var variables = ImmutableMap.<String, Object>builder().putAll(Map.of(
"executionId", currentExecution.getId(),
"namespace", currentFlow.getNamespace(),
"flowId", currentFlow.getId(),
"flowRevision", currentFlow.getRevision()
);
"flowRevision", currentFlow.getRevision(),
"taskRunId", currentTaskRun.getId()
));
if (currentTaskRun.getValue() != null) {
variables.put("taskRunValue", currentTaskRun.getValue());
}
if (currentTaskRun.getIteration() != null) {
variables.put("taskRunIteration", currentTaskRun.getIteration());
}

FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
Expand All @@ -111,15 +165,15 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables)
.variables(variables.build())
.build()
)
.withScheduleDate(scheduleOnDate);
return SubflowExecution.builder()
return Optional.of(SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
.execution(execution)
.build();
.build());
}

private static List<Label> systemLabels(Execution execution) {
Expand All @@ -142,7 +196,7 @@ public static TaskRun manageIterations(Storage storage, TaskRun taskRun, Executi
Optional.empty();

// search for the previous iterations, if not found, we init it with an empty map
Map<String, Integer> iterations = previousTaskRun.getOutputs() != null ?
Map<String, Integer> iterations = !MapUtils.isEmpty(previousTaskRun.getOutputs()) ?
(Map<String, Integer>) previousTaskRun.getOutputs().get(TASK_VARIABLE_ITERATIONS) :
new HashMap<>();

Expand All @@ -151,6 +205,11 @@ public static TaskRun manageIterations(Storage storage, TaskRun taskRun, Executi
if (previousState.isPresent() && previousState.get() != currentState) {
int previousStateIterations = iterations.getOrDefault(previousState.get().toString(), numberOfBatches);
iterations.put(previousState.get().toString(), previousStateIterations - 1);

if (previousState.get() == State.Type.RESTARTED) {
// if we are in a restart, we need to reset the failed executions
iterations.put(State.Type.FAILED.toString(), 0);
}
}

// update the state to success if terminatedIterations == numberOfBatches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
Expand Down Expand Up @@ -213,7 +214,11 @@ public Execution restart(final Execution execution, @Nullable Integer revision)
execution.withState(State.Type.RESTARTED).getState()
);

newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt());
List<Label> newLabels = new ArrayList<>(execution.getLabels());
if (!newLabels.contains(new Label(Label.RESTARTED, "true"))) {
newLabels.add(new Label(Label.RESTARTED, "true"));
}
newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt()).withLabels(newLabels);

return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
Expand Down
25 changes: 21 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
The `items` value must be Kestra's internal storage URI e.g. an output file from a previous task, or a file from inputs of FILE type.
Two special variables are available to pass as inputs to the subflow:
- `taskrun.items` which is the URI of internal storage file containing the batch of items to process
- `taskrun.iteration` which is the iteration or batch number"""
- `taskrun.iteration` which is the iteration or batch number
Restarting a parent flow will restart any subflows that has previously been executed."""
)
@Plugin(
examples = {
Expand Down Expand Up @@ -315,6 +317,17 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
@Valid
private List<Task> errors;

@Schema(
title = "What to do when a failed execution is restarting.",
description = """
- RETRY_FAILED (default): will restart the each subflow executions that are failed.
- NEW_EXECUTION: will create a new subflow execution for each batch of items.""
"""
)
@NotNull
@Builder.Default
private ExecutableTask.RestartBehavior restartBehavior = ExecutableTask.RestartBehavior.RETRY_FAILED;

@Override
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
Expand Down Expand Up @@ -359,7 +372,7 @@ public List<Task> getTasks() {
return List.of(
new ForEachItemSplit(this.getId(), this.items, this.batch),
new ForEachItemExecutable(this.getId(), this.inputs, this.inheritLabels, this.labels, this.wait, this.transmitFailed, this.scheduleDate,
new ExecutableTask.SubflowId(this.namespace, this.flowId, Optional.ofNullable(this.revision))
new ExecutableTask.SubflowId(this.namespace, this.flowId, Optional.ofNullable(this.revision)), this.restartBehavior
),
new ForEachItemMergeOutputs(this.getId())
);
Expand Down Expand Up @@ -424,15 +437,17 @@ public static class ForEachItemExecutable extends Task implements ExecutableTask
private Boolean transmitFailed;
private Property<ZonedDateTime> scheduleOn;
private SubflowId subflowId;
private RestartBehavior restartBehavior;

private ForEachItemExecutable(String parentId, Map<String, Object> inputs, Boolean inheritLabels, List<Label> labels, Boolean wait, Boolean transmitFailed, Property<ZonedDateTime> scheduleOn, SubflowId subflowId) {
private ForEachItemExecutable(String parentId, Map<String, Object> inputs, Boolean inheritLabels, List<Label> labels, Boolean wait, Boolean transmitFailed, Property<ZonedDateTime> scheduleOn, SubflowId subflowId, RestartBehavior restartBehavior) {
this.inputs = inputs;
this.inheritLabels = inheritLabels;
this.labels = labels;
this.wait = wait;
this.transmitFailed = transmitFailed;
this.scheduleOn = scheduleOn;
this.subflowId = subflowId;
this.restartBehavior = restartBehavior;

this.id = parentId + SUFFIX;
this.type = ForEachItemExecutable.class.getName();
Expand All @@ -458,7 +473,7 @@ public List<SubflowExecution<?>> createSubflowExecutions(

return splits
.stream()
.<SubflowExecution<?>>map(throwFunction(
.map(throwFunction(
split -> {
int iteration = currentIteration.getAndIncrement();
// these are special variable that can be passed to the subflow
Expand Down Expand Up @@ -492,6 +507,8 @@ public List<SubflowExecution<?>> createSubflowExecutions(
);
}
))
.filter(Optional::isPresent)
.<SubflowExecution<?>>map(Optional::get)
.toList();
} catch (IOException e) {
throw new InternalException(e);
Expand Down
30 changes: 19 additions & 11 deletions core/src/main/java/io/kestra/plugin/core/flow/Subflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,9 @@

import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.stream.Streams;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

@SuperBuilder
Expand All @@ -53,7 +47,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Create a subflow execution. Subflows offer a modular way to reuse workflow logic by calling other flows just like calling a function in a programming language."
title = "Create a subflow execution. Subflows offer a modular way to reuse workflow logic by calling other flows just like calling a function in a programming language.",
description = "Restarting a parent flow will restart any subflows that has previously been executed."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -157,9 +152,20 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi

@Schema(
title = "Don't trigger the subflow now but schedule it on a specific date."
)
)
private Property<ZonedDateTime> scheduleDate;

@Schema(
title = "What to do when a failed execution is restarting.",
description = """
- RETRY_FAILED (default): will restart the subflow execution if it's failed.
- NEW_EXECUTION: will create a new subflow execution.""
"""
)
@NotNull
@Builder.Default
private RestartBehavior restartBehavior = RestartBehavior.RETRY_FAILED;

@Override
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Expand All @@ -171,7 +177,7 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
inputs.putAll(runContext.render(this.inputs));
}

return List.of(ExecutableUtils.subflowExecution(
return ExecutableUtils.subflowExecution(
runContext,
flowExecutorInterface,
currentExecution,
Expand All @@ -182,7 +188,9 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
labels,
inheritLabels,
scheduleDate
));
)
.<List<SubflowExecution<?>>>map(subflowExecution -> List.of(subflowExecution))
.orElse(Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ void restartMultiple() throws Exception {
restartCaseTest.restartMultiple();
}

@Test
@LoadFlows({"flows/valids/restart-parent.yaml", "flows/valids/restart-child.yaml"})
void restartSubflow() throws Exception {
restartCaseTest.restartSubflow();
}

@RetryingTest(5)
@LoadFlows({"flows/valids/trigger-flow-listener-no-inputs.yaml",
"flows/valids/trigger-flow-listener.yaml",
Expand Down
Loading

0 comments on commit 1e36d1e

Please sign in to comment.