diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..fcb847f7 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,27 @@ +default_stages: [ commit ] +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.1.0 + hooks: + - id: check-merge-conflict + - repo: https://github.com/psf/black + rev: 22.3.0 + hooks: + - id: black + language_version: python3 + - repo: https://github.com/pycqa/isort + rev: 5.6.4 + hooks: + - id: isort + args: ["--profile", "black", "--filter-files"] + - repo: https://github.com/pycqa/flake8 + rev: 5.0.4 + hooks: + - id: flake8 + language_version: python3 + - repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook + rev: v2.2.0 + hooks: + - id: commitlint + stages: [ commit-msg ] + additional_dependencies: [ '@commitlint/config-conventional' ] diff --git a/bamboo_engine/config.py b/bamboo_engine/config.py index 1034f4f7..bebf2d21 100644 --- a/bamboo_engine/config.py +++ b/bamboo_engine/config.py @@ -10,6 +10,8 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from bamboo_engine.utils.constants import ExclusiveGatewayStrategy +from bamboo_engine.utils.expr import default_expr_func # 引擎内部配置模块 @@ -64,3 +66,7 @@ class Settings: MAKO_SANDBOX_IMPORT_MODULES = {} RERUN_INDEX_OFFSET = 0 + + PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = default_expr_func + + PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = ExclusiveGatewayStrategy.ONLY.value diff --git a/bamboo_engine/eri/interfaces.py b/bamboo_engine/eri/interfaces.py index 2e6c8869..def8de83 100644 --- a/bamboo_engine/eri/interfaces.py +++ b/bamboo_engine/eri/interfaces.py @@ -11,29 +11,29 @@ specific language governing permissions and limitations under the License. """ -from datetime import datetime from abc import ABCMeta, abstractmethod -from typing import List, Optional, Dict, Set, Any, Tuple +from datetime import datetime +from typing import Any, Dict, List, Optional, Set, Tuple from .models import ( - ScheduleInterruptPoint, - State, - Node, - Schedule, - ScheduleType, + CallbackData, + ContextValue, Data, DataInput, + DispatchProcess, + ExecuteInterruptEvent, + ExecuteInterruptPoint, ExecutionData, ExecutionHistory, ExecutionShortHistory, - CallbackData, + Node, ProcessInfo, - SuspendedProcessInfo, - DispatchProcess, - ContextValue, - ExecuteInterruptPoint, - ExecuteInterruptEvent, + Schedule, ScheduleInterruptEvent, + ScheduleInterruptPoint, + ScheduleType, + State, + SuspendedProcessInfo, ) # plugin interface @@ -1471,6 +1471,14 @@ def handle_schedule_interrupt_event(self, event: ScheduleInterruptEvent): """ +class ConfigMixin: + @abstractmethod + def get_config(self, name): + """ + 获取配置 + """ + + class EngineRuntimeInterface( PluginManagerMixin, EngineAPIHooksMixin, @@ -1483,6 +1491,7 @@ class EngineRuntimeInterface( DataMixin, ExecutionHistoryMixin, InterruptMixin, + ConfigMixin, metaclass=ABCMeta, ): @abstractmethod diff --git a/bamboo_engine/handlers/conditional_parallel_gateway.py b/bamboo_engine/handlers/conditional_parallel_gateway.py index 94b7333b..346597ea 100644 --- a/bamboo_engine/handlers/conditional_parallel_gateway.py +++ b/bamboo_engine/handlers/conditional_parallel_gateway.py @@ -17,13 +17,13 @@ from pyparsing import ParseException -from bamboo_engine.utils.boolrule import BoolRule -from bamboo_engine.template.template import Template -from bamboo_engine.interrupt import ExecuteKeyPoint -from bamboo_engine import states, metrics -from bamboo_engine.eri import NodeType, ProcessInfo, ExecuteInterruptPoint +from bamboo_engine import metrics, states from bamboo_engine.context import Context -from bamboo_engine.handler import register_handler, NodeHandler, ExecuteResult +from bamboo_engine.eri import ExecuteInterruptPoint, NodeType, ProcessInfo +from bamboo_engine.handler import ExecuteResult, NodeHandler, register_handler +from bamboo_engine.interrupt import ExecuteKeyPoint +from bamboo_engine.template.template import Template +from bamboo_engine.utils.constants import RuntimeSettings from bamboo_engine.utils.string import transform_escape_char logger = logging.getLogger("bamboo_engine") @@ -109,7 +109,8 @@ def execute( ) try: - result = BoolRule(resolved_evaluate).test() + expr_func = self.runtime.get_config(RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value) + result = expr_func(resolved_evaluate, hydrated_context) logger.info( "root_pipeline[%s] node(%s) %s test result: %s", root_pipeline_id, diff --git a/bamboo_engine/handlers/exclusive_gateway.py b/bamboo_engine/handlers/exclusive_gateway.py index b0fbaa9d..35ff8aab 100644 --- a/bamboo_engine/handlers/exclusive_gateway.py +++ b/bamboo_engine/handlers/exclusive_gateway.py @@ -16,13 +16,12 @@ from pyparsing import ParseException -from bamboo_engine import states, metrics +from bamboo_engine import metrics, states from bamboo_engine.context import Context +from bamboo_engine.eri import ExecuteInterruptPoint, NodeType, ProcessInfo +from bamboo_engine.handler import ExecuteResult, NodeHandler, register_handler from bamboo_engine.template import Template -from bamboo_engine.handler import register_handler, NodeHandler, ExecuteResult -from bamboo_engine.utils.boolrule import BoolRule -from bamboo_engine.eri import NodeType, ProcessInfo, ExecuteInterruptPoint - +from bamboo_engine.utils.constants import ExclusiveGatewayStrategy, RuntimeSettings from bamboo_engine.utils.string import transform_escape_char logger = logging.getLogger("bamboo_engine") @@ -115,7 +114,8 @@ def execute( hydrated_context, ) try: - result = BoolRule(resolved_evaluate).test() + expr_func = self.runtime.get_config(RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value) + result = expr_func(resolved_evaluate, hydrated_context) logger.info( "root_pipeline[%s] node(%s) %s test result: %s", root_pipeline_id, @@ -123,6 +123,13 @@ def execute( resolved_evaluate, result, ) + + strategy = self.runtime.get_config(RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY.value) + # 如果策略是命中第一个,并且result为true, 则直接结束循环 + if strategy == ExclusiveGatewayStrategy.FIRST.value and result: + meet_conditions.append(c.name) + meet_targets.append(c.target_id) + break except ParseException as e: logger.exception(f"[exclusive_gateway] evaluation parse error: {e}") return self._execute_fail( diff --git a/bamboo_engine/utils/constants.py b/bamboo_engine/utils/constants.py index f58d8dec..1eb0f7df 100644 --- a/bamboo_engine/utils/constants.py +++ b/bamboo_engine/utils/constants.py @@ -10,13 +10,34 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - +from enum import Enum from bamboo_engine.eri import ContextValueType - VAR_CONTEXT_MAPPING = { "plain": ContextValueType.PLAIN, "splice": ContextValueType.SPLICE, "lazy": ContextValueType.COMPUTE, } + + +class ExclusiveGatewayStrategy(Enum): + """ + 网关命中策略 + """ + + # 唯一命中 + ONLY = 1 + # 优先命中第一个 + FIRST = 2 + + +class RuntimeSettings(Enum): + PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC" + PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = "PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY" + + +RUNTIME_ALLOWED_CONFIG = [ + RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value, + RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY.value, +] diff --git a/bamboo_engine/utils/expr.py b/bamboo_engine/utils/expr.py new file mode 100644 index 00000000..58ffca1d --- /dev/null +++ b/bamboo_engine/utils/expr.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +from bamboo_engine.utils.boolrule import BoolRule + + +def default_expr_func(expr: str, context: dict) -> bool: + return BoolRule(expr).test() diff --git a/commitlint.config.js b/commitlint.config.js new file mode 100644 index 00000000..fe3c7e44 --- /dev/null +++ b/commitlint.config.js @@ -0,0 +1,64 @@ +const Configuration = { + /* + * Resolve and load @commitlint/config-conventional from node_modules. + * Referenced packages must be installed + */ + extends: [ + '@commitlint/config-conventional' + ], + /* + * Resolve and load conventional-changelog-atom from node_modules. + * Referenced packages must be installed + */ + formatter: '@commitlint/format', + /* + * Any rules defined here will override rules from @commitlint/config-conventional + */ + rules: { + 'type-enum': [ + 2, + 'always', + [ + 'feature', + 'bugfix', + 'minor', + 'optimization', + 'sprintfix', + 'refactor', + 'test', + 'docs', + 'merge', + ], + ] + }, + /* + * Functions that return true if commitlint should ignore the given message. + */ + ignores: [ + (commit) => commit === '', + (message) => message.includes('Merge'), + (message) => message.includes('merge') + ], + /* + * Whether commitlint uses the default ignore rules. + */ + defaultIgnores: true, + /* + * Custom URL to show upon failure + */ + helpUrl: + 'https://github.com/conventional-changelog/commitlint/#what-is-commitlint', + /* + * Custom prompt configs + */ + prompt: { + messages: {}, + questions: { + type: { + description: 'please input type:', + }, + }, + }, +}; + +module.exports = Configuration; diff --git a/runtime/bamboo-pipeline/pipeline/eri/apps.py b/runtime/bamboo-pipeline/pipeline/eri/apps.py index 2bd3e4c9..418ee112 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/apps.py +++ b/runtime/bamboo-pipeline/pipeline/eri/apps.py @@ -12,8 +12,11 @@ """ from django.apps import AppConfig +from django.conf import settings +from pipeline.exceptions import ConfigValidationError from bamboo_engine.handlers import register +from bamboo_engine.utils.constants import ExclusiveGatewayStrategy class ERIConfig(AppConfig): @@ -24,3 +27,33 @@ def ready(self): from .celery.tasks import execute, schedule # noqa register() + + # 校验 PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC 配置 + if hasattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC"): + + pipeline_exclusive_gateway_expr_func = getattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC", None) + if not callable(pipeline_exclusive_gateway_expr_func): + raise ConfigValidationError("config validate error, the expr func must be callable, please check it") + + # 是否校验 + pipeline_exclusive_gateway_expr_func_check = getattr( + settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC_CHECK", False + ) + if not pipeline_exclusive_gateway_expr_func_check: + # 获取校验文本 + pipeline_exclusive_gateway_expr_func_text = getattr( + settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC_TEXT", "1==1" + ) + check_result = pipeline_exclusive_gateway_expr_func(pipeline_exclusive_gateway_expr_func_text, {}) + if not check_result: + raise ConfigValidationError("config validate error, the expr func return False") + + if hasattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY"): + pipeline_exclusive_gateway_strategy = getattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY") + if pipeline_exclusive_gateway_strategy not in [ + ExclusiveGatewayStrategy.ONLY.value, + ExclusiveGatewayStrategy.FIRST.value, + ]: + raise ConfigValidationError( + "config validate error, the pipeline_exclusive_gateway_strategy only support 1(ONLY), 2(FIRST)" + ) diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/config.py b/runtime/bamboo-pipeline/pipeline/eri/imp/config.py new file mode 100644 index 00000000..5a401059 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/config.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +from django.conf import settings + +from bamboo_engine.config import Settings +from bamboo_engine.utils.constants import RUNTIME_ALLOWED_CONFIG + + +class ConfigMixin: + def get_config(self, name): + if name not in RUNTIME_ALLOWED_CONFIG: + raise ValueError("unsupported pipeline config, name={}".format(name)) + + custom_config_value = getattr(settings, name, None) + if custom_config_value: + return custom_config_value + return getattr(Settings, name) diff --git a/runtime/bamboo-pipeline/pipeline/eri/runtime.py b/runtime/bamboo-pipeline/pipeline/eri/runtime.py index 968b00f7..664157ff 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/runtime.py +++ b/runtime/bamboo-pipeline/pipeline/eri/runtime.py @@ -12,35 +12,46 @@ """ import json -from typing import Optional, List, Tuple +from typing import List, Optional, Tuple from django.conf import settings from django.db import transaction - -from kombu import Exchange, Queue, Connection - -from bamboo_engine import states -from bamboo_engine.template import Template -from bamboo_engine.eri import interfaces -from bamboo_engine.eri import EngineRuntimeInterface, NodeType, ContextValueType - +from kombu import Connection, Exchange, Queue from pipeline.eri import codec -from pipeline.eri.utils import caculate_final_references, CONTEXT_VALUE_TYPE_MAP -from pipeline.eri.imp.plugin_manager import PipelinePluginManagerMixin +from pipeline.eri.celery.queues import QueueResolver +from pipeline.eri.imp.config import ConfigMixin +from pipeline.eri.imp.context import ContextMixin +from pipeline.eri.imp.data import DataMixin +from pipeline.eri.imp.event import EventMixin +from pipeline.eri.imp.execution_history import ExecutionHistoryMixin from pipeline.eri.imp.hooks import HooksMixin -from pipeline.eri.imp.process import ProcessMixin +from pipeline.eri.imp.interrupt import InterruptMixin from pipeline.eri.imp.node import NodeMixin -from pipeline.eri.imp.state import StateMixin +from pipeline.eri.imp.plugin_manager import PipelinePluginManagerMixin +from pipeline.eri.imp.process import ProcessMixin from pipeline.eri.imp.schedule import ScheduleMixin -from pipeline.eri.imp.data import DataMixin -from pipeline.eri.imp.context import ContextMixin -from pipeline.eri.imp.execution_history import ExecutionHistoryMixin +from pipeline.eri.imp.state import StateMixin from pipeline.eri.imp.task import TaskMixin -from pipeline.eri.imp.interrupt import InterruptMixin -from pipeline.eri.imp.event import EventMixin -from pipeline.eri.celery.queues import QueueResolver +from pipeline.eri.models import ( + ContextOutputs, + ContextValue, + Data, + ExecutionHistory, + LogEntry, + Node, + Process, + State, +) +from pipeline.eri.utils import CONTEXT_VALUE_TYPE_MAP, caculate_final_references -from pipeline.eri.models import Node, Data, ContextValue, Process, ContextOutputs, LogEntry, ExecutionHistory, State +from bamboo_engine import states +from bamboo_engine.eri import ( + ContextValueType, + EngineRuntimeInterface, + NodeType, + interfaces, +) +from bamboo_engine.template import Template class BambooDjangoRuntime( @@ -56,6 +67,7 @@ class BambooDjangoRuntime( HooksMixin, InterruptMixin, EventMixin, + ConfigMixin, EngineRuntimeInterface, ): diff --git a/runtime/bamboo-pipeline/pipeline/exceptions.py b/runtime/bamboo-pipeline/pipeline/exceptions.py index 932b8c18..d5222687 100644 --- a/runtime/bamboo-pipeline/pipeline/exceptions.py +++ b/runtime/bamboo-pipeline/pipeline/exceptions.py @@ -150,6 +150,10 @@ class AttributeValidationError(TagError): pass +class ConfigValidationError(PipelineError): + pass + + # # constant exception # diff --git a/tests/handlers/test_conditional_parallel_gateway.py b/tests/handlers/test_conditional_parallel_gateway.py index d6e7d1e8..d6b06eac 100644 --- a/tests/handlers/test_conditional_parallel_gateway.py +++ b/tests/handlers/test_conditional_parallel_gateway.py @@ -15,18 +15,19 @@ from mock import MagicMock, patch from bamboo_engine import states -from bamboo_engine.interrupt import ExecuteInterrupter, ExecuteKeyPoint from bamboo_engine.eri import ( - ProcessInfo, - NodeType, - ConditionalParallelGateway, Condition, - ExecuteInterruptPoint, + ConditionalParallelGateway, DefaultCondition, + ExecuteInterruptPoint, + NodeType, + ProcessInfo, ) from bamboo_engine.handlers.conditional_parallel_gateway import ( ConditionalParallelGatewayHandler, ) +from bamboo_engine.interrupt import ExecuteInterrupter, ExecuteKeyPoint +from bamboo_engine.utils.expr import default_expr_func @pytest.fixture @@ -104,7 +105,7 @@ def test_exclusive_gateway__context_hydrate_raise(pi, node, interrupter, recover handler = ConditionalParallelGatewayHandler(node, runtime, interrupter) with patch("bamboo_engine.handlers.conditional_parallel_gateway.Context", MagicMock(return_value=raise_context)): - with patch("bamboo_engine.handlers.conditional_parallel_gateway.BoolRule", MagicMock(side_effect=Exception)): + with patch("bamboo_engine.utils.expr.BoolRule", MagicMock(side_effect=Exception)): result = handler.execute(pi, 1, 1, "v1", recover_point) assert result.should_sleep == True @@ -144,6 +145,7 @@ def test_conditional_parallel_gateway__execute_bool_rule_test_raise(pi, node, in runtime.get_context_values = MagicMock(return_value=[]) runtime.get_execution_data_outputs = MagicMock(return_value={}) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ConditionalParallelGatewayHandler(node, runtime, interrupter) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -185,6 +187,7 @@ def test_conditional_parallel_gateway__execute_not_fork_targets(pi, node, interr runtime.get_context_values = MagicMock(return_value=[]) runtime.get_execution_data_outputs = MagicMock(return_value={}) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ConditionalParallelGatewayHandler(node, runtime, interrupter) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -234,6 +237,7 @@ def test_conditional_parallel_gateway__execute_success(pi, node, interrupter, re runtime.get_context_values = MagicMock(return_value=[]) runtime.fork = MagicMock(return_value=dispatch_processes) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ConditionalParallelGatewayHandler(node, runtime, interrupter) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -281,6 +285,7 @@ def test_conditional_parallel_gateway__recover_with_dispatch_processes(pi, node, runtime.get_context_key_references = MagicMock(return_value=additional_refs) runtime.get_context_values = MagicMock(return_value=[]) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ConditionalParallelGatewayHandler(node, runtime, interrupter) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -328,6 +333,7 @@ def test_conditional_parallel_gateway__no_meet_target_with_default_condition(pi, runtime.get_context_values = MagicMock(return_value=[]) runtime.fork = MagicMock(return_value=dispatch_processes) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ConditionalParallelGatewayHandler(node, runtime, interrupter) result = handler.execute(pi, 1, 1, "v1", recover_point) diff --git a/tests/handlers/test_exclusive_gateway.py b/tests/handlers/test_exclusive_gateway.py index 60a8da5e..27501232 100644 --- a/tests/handlers/test_exclusive_gateway.py +++ b/tests/handlers/test_exclusive_gateway.py @@ -16,15 +16,15 @@ from bamboo_engine import states from bamboo_engine.eri import ( - ProcessInfo, - NodeType, - ExclusiveGateway, Condition, DefaultCondition, + ExclusiveGateway, + NodeType, + ProcessInfo, ) -from bamboo_engine.handlers.exclusive_gateway import ( - ExclusiveGatewayHandler, -) +from bamboo_engine.handlers.exclusive_gateway import ExclusiveGatewayHandler +from bamboo_engine.utils.constants import ExclusiveGatewayStrategy, RuntimeSettings +from bamboo_engine.utils.expr import default_expr_func @pytest.mark.parametrize( @@ -64,13 +64,14 @@ def test_exclusive_gateway__context_hydrate_raise(recover_point): runtime.get_context_values = MagicMock(return_value=[]) runtime.get_execution_data_outputs = MagicMock(return_value={}) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) raise_context = MagicMock() raise_context.hydrate = MagicMock(side_effect=Exception) handler = ExclusiveGatewayHandler(node, runtime, MagicMock()) with patch("bamboo_engine.handlers.exclusive_gateway.Context", MagicMock(return_value=raise_context)): - with patch("bamboo_engine.handlers.exclusive_gateway.BoolRule", MagicMock(side_effect=Exception)): + with patch("bamboo_engine.utils.expr.BoolRule", MagicMock(side_effect=Exception)): result = handler.execute(pi, 1, 1, "v1", recover_point) assert result.should_sleep == True @@ -132,6 +133,7 @@ def test_exclusive_gateway__execute_bool_rule_test_raise(recover_point): runtime.get_context_values = MagicMock(return_value=[]) runtime.get_execution_data_outputs = MagicMock(return_value={}) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ExclusiveGatewayHandler(node, runtime, MagicMock()) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -195,6 +197,7 @@ def test_exclusive_gateway__execute_not_meet_targets(recover_point): runtime.get_context_values = MagicMock(return_value=[]) runtime.get_execution_data_outputs = MagicMock(return_value={}) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ExclusiveGatewayHandler(node, runtime, MagicMock()) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -262,6 +265,7 @@ def test_exclusive_gateway__execute_not_meet_targets_with_default(recover_point) runtime.get_context_values = MagicMock(return_value=[]) runtime.get_execution_data_outputs = MagicMock(return_value={}) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ExclusiveGatewayHandler(node, runtime, MagicMock()) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -323,6 +327,7 @@ def test_exclusive_gateway__execute_mutiple_meet_targets(recover_point): runtime.get_context_values = MagicMock(return_value=[]) runtime.get_execution_data_outputs = MagicMock(return_value={}) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ExclusiveGatewayHandler(node, runtime, MagicMock()) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -388,6 +393,7 @@ def test_exclusive_gateway__execute_success(recover_point): runtime.get_context_key_references = MagicMock(return_value=additional_refs) runtime.get_context_values = MagicMock(return_value=[]) runtime.get_data_inputs = MagicMock(return_value={}) + runtime.get_config = MagicMock(return_value=default_expr_func) handler = ExclusiveGatewayHandler(node, runtime, MagicMock) result = handler.execute(pi, 1, 1, "v1", recover_point) @@ -410,3 +416,72 @@ def test_exclusive_gateway__execute_success(recover_point): set_archive_time=True, ignore_boring_set=recover_point is not None, ) + + +@pytest.mark.parametrize( + "recover_point", + [ + pytest.param(MagicMock(), id="recover_is_not_none"), + pytest.param(None, id="recover_is_none"), + ], +) +def test_exclusive_gateway_first_strategy_success(recover_point): + conditions = [ + Condition(name="c1", evaluation="2 == 2", target_id="t1", flow_id="f1"), + Condition(name="c2", evaluation="0 == 1", target_id="t2", flow_id="f2"), + Condition(name="c3", evaluation="1 == 1", target_id="t3", flow_id="f3"), + ] + node = ExclusiveGateway( + conditions=conditions, + id="nid", + type=NodeType.ExclusiveGateway, + target_flows=["f1", "f2", "f3"], + target_nodes=["t1", "t2", "t3"], + targets={"f1": "t1", "f2": "t2", "f3": "t3"}, + root_pipeline_id="root", + parent_pipeline_id="root", + can_skip=True, + ) + pi = ProcessInfo( + process_id="pid", + destination_id="", + root_pipeline_id="root", + pipeline_stack=["root"], + parent_id="parent", + ) + additional_refs = [] + + runtime = MagicMock() + runtime.get_context_key_references = MagicMock(return_value=additional_refs) + runtime.get_context_values = MagicMock(return_value=[]) + runtime.get_data_inputs = MagicMock(return_value={}) + + def get_config(config_name): + if config_name == RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value: + return default_expr_func + if config_name == RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY.value: + return ExclusiveGatewayStrategy.FIRST.value + + runtime.get_config = MagicMock(side_effect=get_config) + + handler = ExclusiveGatewayHandler(node, runtime, MagicMock) + result = handler.execute(pi, 1, 1, "v1", recover_point) + + assert result.should_sleep == False + assert result.schedule_ready == False + assert result.schedule_type == None + assert result.schedule_after == -1 + assert result.dispatch_processes == [] + assert result.next_node_id == "t1" + assert result.should_die == False + + runtime.get_data_inputs.assert_called_once_with("root") + runtime.get_context_key_references.assert_called_once_with(pipeline_id=pi.top_pipeline_id, keys=set()) + runtime.get_context_values.assert_called_once_with(pipeline_id=pi.top_pipeline_id, keys=set()) + runtime.set_state.assert_called_once_with( + node_id=node.id, + version="v1", + to_state=states.FINISHED, + set_archive_time=True, + ignore_boring_set=recover_point is not None, + )