Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FM-751] Add system task offset evaluation strategy #179

Merged
merged 6 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.springframework.util.unit.DataSize;
import org.springframework.util.unit.DataUnit;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.core.execution.offset.OffsetEvaluationStrategy;

@ConfigurationProperties("conductor.app")
public class ConductorProperties {

Expand Down Expand Up @@ -97,6 +100,15 @@ public class ConductorProperties {
@DurationUnit(ChronoUnit.SECONDS)
private Duration systemTaskWorkerCallbackDuration = Duration.ofSeconds(30);

/**
* The strategy to be used for evaluation of the offset for a postponed system task of certain
* type.<br>
* Tasks that are not listed here use {@link
* ConductorProperties#systemTaskWorkerCallbackDuration} value.
*/
private Map<TaskType, OffsetEvaluationStrategy> systemTaskOffsetEvaluation =
Map.of(TaskType.JOIN, OffsetEvaluationStrategy.BACKOFF_TO_DEFAULT_OFFSET);

/**
* The interval (in milliseconds) at which system task queues will be polled by the system task
* workers.
Expand Down Expand Up @@ -353,6 +365,15 @@ public Duration getSystemTaskWorkerCallbackDuration() {
return systemTaskWorkerCallbackDuration;
}

public void setSystemTaskOffsetEvaluation(
final Map<TaskType, OffsetEvaluationStrategy> systemTaskOffsetEvaluation) {
this.systemTaskOffsetEvaluation = systemTaskOffsetEvaluation;
}

public Map<TaskType, OffsetEvaluationStrategy> getSystemTaskOffsetEvaluation() {
return systemTaskOffsetEvaluation;
}

public void setSystemTaskWorkerCallbackDuration(Duration systemTaskWorkerCallbackDuration) {
this.systemTaskWorkerCallbackDuration = systemTaskWorkerCallbackDuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
*/
package com.netflix.conductor.core.execution;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.execution.offset.OffsetEvaluationStrategy;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
Expand All @@ -35,6 +39,7 @@ public class AsyncSystemTaskExecutor {
private final long queueTaskMessagePostponeSecs;
private final long systemTaskCallbackTime;
private final WorkflowExecutor workflowExecutor;
private final Map<TaskType, OffsetEvaluationStrategy> systemTaskOffsetEvaluation;

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSystemTaskExecutor.class);

Expand All @@ -52,6 +57,7 @@ public AsyncSystemTaskExecutor(
conductorProperties.getSystemTaskWorkerCallbackDuration().getSeconds();
this.queueTaskMessagePostponeSecs =
conductorProperties.getTaskExecutionPostponeDuration().getSeconds();
this.systemTaskOffsetEvaluation = conductorProperties.getSystemTaskOffsetEvaluation();
}

/**
Expand Down Expand Up @@ -164,12 +170,15 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
hasTaskExecutionCompleted = true;
LOGGER.debug("{} removed from queue: {}", task, queueName);
} else {
task.setCallbackAfterSeconds(systemTaskCallbackTime);
systemTask
.getEvaluationOffset(task, systemTaskCallbackTime)
.ifPresentOrElse(
task::setCallbackAfterSeconds,
() -> task.setCallbackAfterSeconds(systemTaskCallbackTime));
final long callbackAfterSeconds =
systemTaskOffsetEvaluation
.getOrDefault(
TaskType.of(task.getTaskType()),
OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET)
.getTaskOffsetEvaluation()
.computeEvaluationOffset(
task, systemTaskCallbackTime, queueDAO.getSize(queueName));
task.setCallbackAfterSeconds(callbackAfterSeconds);
queueDAO.postpone(
queueName,
task.getTaskId(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;

import com.netflix.conductor.model.TaskModel;

/**
* Computes the evaluation offset for a postponed task based on the task's poll count and a default
* offset. In this strategy offset increases exponentially until it reaches the default offset.<br>
* This strategy is appropriate for queues that require low latency of all tasks.<br>
* Sample evaluationOffset for different pollCounts and defaultOffset (queueSize is ignored):
*
* <table>
* <tr><th>pollCount</th><th>defaultOffset</th><th>evaluationOffset</th></tr>
* <tr><td>0</td><td>5</td><td>0</td></tr>
* <tr><td>1</td><td>5</td><td>0</td></tr>
* <tr><td>2</td><td>5</td><td>2</td></tr>
* <tr><td>3</td><td>5</td><td>4</td></tr>
* <tr><td>4</td><td>5</td><td>5</td></tr>
* <tr><td>4</td><td>10</td><td>8</td></tr>
* <tr><td>5</td><td>10</td><td>10</td></tr>
* </table>
*/
final class BackoffToDefaultOffsetEvaluation implements TaskOffsetEvaluation {

@Override
public long computeEvaluationOffset(
final TaskModel taskModel, final long defaultOffset, final int queueSize) {
final int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
return 0L;
}
return Math.min((long) Math.pow(2, index), defaultOffset);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;

import com.netflix.conductor.model.TaskModel;

/** Dummy implementation of {@link TaskOffsetEvaluation} that always returns the default offset. */
final class ConstantDefaultOffsetEvaluation implements TaskOffsetEvaluation {
@Override
public long computeEvaluationOffset(
final TaskModel taskModel, final long defaultOffset, final int queueSize) {
return defaultOffset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;

/**
* Strategies used for computation of the task offset. The offset is used to postpone the task
* execution in the queue.
*/
public enum OffsetEvaluationStrategy {
/**
* Constant offset evaluation strategy - using default offset value.
*
* @see ConstantDefaultOffsetEvaluation
*/
CONSTANT_DEFAULT_OFFSET(new ConstantDefaultOffsetEvaluation()),
/**
* Computes the evaluation offset for a postponed task based on the task's poll count and a
* default offset. In this strategy offset increases exponentially until it reaches the default
* offset.
*
* @see BackoffToDefaultOffsetEvaluation
*/
BACKOFF_TO_DEFAULT_OFFSET(new BackoffToDefaultOffsetEvaluation()),
/**
* Computes the evaluation offset for a postponed task based on the queue size and the task's
* poll count. In this strategy offset increases exponentially until it reaches the (default
* offset * queue size) value.
*
* @see ScaledByQueueSizeOffsetEvaluation
*/
SCALED_BY_QUEUE_SIZE(new ScaledByQueueSizeOffsetEvaluation());

private final TaskOffsetEvaluation taskOffsetEvaluation;

OffsetEvaluationStrategy(final TaskOffsetEvaluation taskOffsetEvaluation) {
this.taskOffsetEvaluation = taskOffsetEvaluation;
}

/**
* Get the task offset evaluation strategy.
*
* @return {@link TaskOffsetEvaluation}
*/
public TaskOffsetEvaluation getTaskOffsetEvaluation() {
return taskOffsetEvaluation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;

import com.netflix.conductor.model.TaskModel;

/**
* Computes the evaluation offset for a postponed task based on the queue size and the task's poll
* count. In this strategy offset increases exponentially until it reaches the (default offset *
* queue size) value.<br>
* This strategy is appropriate for relatively big queues (100-1000s tasks) that contain
* long-running tasks (days-weeks) with high number of poll-counts.<br>
* Sample evaluationOffset for different pollCounts, defaultOffset and queueSize:
*
* <table>
* <tr><th>pollCount</th><th>defaultOffset</th><th>queueSize</th><th>evaluationOffset</th></tr>
* <tr><td>0</td><td>-</td><td>-</td><td>0</td></tr>
* <tr><td>1</td><td>-</td><td>-</td><td>0</td></tr>
* <tr><td>2</td><td>5</td><td>1</td><td>2</td></tr>
* <tr><td>3</td><td>5</td><td>1</td><td>4</td></tr>
* <tr><td>4</td><td>5</td><td>1</td><td>5</td></tr>
* <tr><td>4</td><td>5</td><td>0</td><td>5</td></tr>
* <tr><td>4</td><td>5</td><td>2</td><td>8</td></tr>
* </table>
*/
final class ScaledByQueueSizeOffsetEvaluation implements TaskOffsetEvaluation {

@Override
public long computeEvaluationOffset(
final TaskModel taskModel, final long defaultOffset, final int queueSize) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
return 0L;
}
final long scaledOffset = queueSize > 0 ? queueSize * defaultOffset : defaultOffset;
return Math.min((long) Math.pow(2, index), scaledOffset);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;

import com.netflix.conductor.model.TaskModel;

/** Service used for computation of the evaluation offset for the postponed task. */
public sealed interface TaskOffsetEvaluation
permits BackoffToDefaultOffsetEvaluation,
ConstantDefaultOffsetEvaluation,
ScaledByQueueSizeOffsetEvaluation {
/**
* Compute the evaluation offset for the postponed task.
*
* @param taskModel details about the postponed task
* @param defaultOffset the default offset provided by the configuration properties [seconds]
* @param queueSize the actual size of the queue before the task is postponed
* @return the computed evaluation offset [seconds]
*/
long computeEvaluationOffset(TaskModel taskModel, long defaultOffset, int queueSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package com.netflix.conductor.core.execution.tasks;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -82,15 +81,6 @@ public boolean execute(
return false;
}

@Override
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
return Optional.of(0L);
}
return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset));
}

public boolean isAsync() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ public boolean execute(
*/
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {}

public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
return Optional.empty();
}

/**
* @return True if the task is supposed to be started asynchronously using internal queues.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.netflix.conductor.model.TaskModel;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class BackoffToDefaultOffsetEvaluationTest {
private static TaskOffsetEvaluation offsetEvaluation;

@BeforeAll
static void setUp() {
offsetEvaluation = new BackoffToDefaultOffsetEvaluation();
}

@AfterAll
static void tearDown() {
offsetEvaluation = null;
}

@Mock private TaskModel taskModel;

@ParameterizedTest
@CsvSource({"0, 5, 0", "1, 5, 0", "2, 5, 2", "3, 5, 4", "4, 5, 5", "4, 10, 8", "5, 10, 10"})
void testComputeEvaluationOffset(
final int pollCount, final long defaultOffset, final long expectedOffset) {
when(taskModel.getPollCount()).thenReturn(pollCount);
final var result = offsetEvaluation.computeEvaluationOffset(taskModel, defaultOffset, 10);
assertEquals(expectedOffset, result);
}
}
Loading
Loading