diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java index e20612913..c5be80763 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java @@ -47,7 +47,6 @@ import java.util.*; import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; import java.util.function.Supplier; @Slf4j @@ -63,20 +62,6 @@ public class DAGOperations implements DAGOperationsInterface { private final DAGResultHandler dagResultHandler; - public static final BiConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> { - int exceptionCatchTimes = retryTimes; - for (int i = 1; i <= exceptionCatchTimes; i++) { - try { - operation.run(); - return; - } catch (Exception e) { - log.warn("operateWithRetry fails, invokeTimes:{}", i, e); - } - } - - operation.run(); - }; - public DAGOperations(ExecutorService runnerExecutor, Map taskRunners, DAGRunner dagRunner, TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback callback, DAGResultHandler dagResultHandler) { diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java index c350133e8..df2c6bdb7 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java @@ -34,6 +34,7 @@ import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper; import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper; import com.weibo.rill.flow.olympicene.traversal.helper.Stasher; +import com.weibo.rill.flow.olympicene.traversal.utils.OperationUtil; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -80,7 +81,7 @@ public void submitTraversal(String executionId, String completedTaskName) { Runnable basicActions = () -> dagStorageProcedure.lockAndRun( LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName)); Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS); - DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); + OperationUtil.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); } catch (Exception e) { log.error("executionId:{} traversal exception with completedTaskName:{}. ", executionId, completedTaskName, e); } @@ -99,7 +100,7 @@ public void submitTasks(String executionId, Set taskInfos, Map context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo); dagOperations.runTasks(executionId, Lists.newArrayList(Pair.of(taskInfo, context))); }; - DAGOperations.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes()); + OperationUtil.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes()); break; default: log.warn("handleTimeCheck time check type nonsupport, type:{}", type); diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtil.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtil.java new file mode 100644 index 000000000..c33f4bdec --- /dev/null +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtil.java @@ -0,0 +1,24 @@ +package com.weibo.rill.flow.olympicene.traversal.utils; + +import lombok.extern.slf4j.Slf4j; + +import java.util.function.BiConsumer; + +@Slf4j +public class OperationUtil { + private OperationUtil() {} + + public static final BiConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> { + int exceptionCatchTimes = retryTimes; + for (int i = 1; i <= exceptionCatchTimes; i++) { + try { + operation.run(); + return; + } catch (Exception e) { + log.warn("operateWithRetry fails, invokeTimes:{}", i, e); + } + } + + operation.run(); + }; +}