Skip to content

Commit

Permalink
feature: 网关支持自定义表达式和策略 (TencentBlueKing#161)
Browse files Browse the repository at this point in the history
* feature: 网关支持自定义表达式和策略

* test: 修复单元测试

* test: 单元测试修复

* test: 新增分支命中策略的单元测试

* minor: get_config 方法签名修改

* minor: 并行网关和分支网关配置合并

* minor: 配置项统一添加前缀

* test: fix单元测试

* minor: 补充config的校验逻辑
  • Loading branch information
hanshuaikang authored Jul 17, 2023
1 parent 48957b4 commit ae818d0
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 61 deletions.
27 changes: 27 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
default_stages: [ commit ]
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.1.0
hooks:
- id: check-merge-conflict
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
language_version: python3
- repo: https://github.com/pycqa/isort
rev: 5.6.4
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
language_version: python3
- repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook
rev: v2.2.0
hooks:
- id: commitlint
stages: [ commit-msg ]
additional_dependencies: [ '@commitlint/config-conventional' ]
6 changes: 6 additions & 0 deletions bamboo_engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from bamboo_engine.utils.constants import ExclusiveGatewayStrategy
from bamboo_engine.utils.expr import default_expr_func

# 引擎内部配置模块

Expand Down Expand Up @@ -64,3 +66,7 @@ class Settings:
MAKO_SANDBOX_IMPORT_MODULES = {}

RERUN_INDEX_OFFSET = 0

PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = default_expr_func

PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = ExclusiveGatewayStrategy.ONLY.value
35 changes: 22 additions & 13 deletions bamboo_engine/eri/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@
specific language governing permissions and limitations under the License.
"""

from datetime import datetime
from abc import ABCMeta, abstractmethod
from typing import List, Optional, Dict, Set, Any, Tuple
from datetime import datetime
from typing import Any, Dict, List, Optional, Set, Tuple

from .models import (
ScheduleInterruptPoint,
State,
Node,
Schedule,
ScheduleType,
CallbackData,
ContextValue,
Data,
DataInput,
DispatchProcess,
ExecuteInterruptEvent,
ExecuteInterruptPoint,
ExecutionData,
ExecutionHistory,
ExecutionShortHistory,
CallbackData,
Node,
ProcessInfo,
SuspendedProcessInfo,
DispatchProcess,
ContextValue,
ExecuteInterruptPoint,
ExecuteInterruptEvent,
Schedule,
ScheduleInterruptEvent,
ScheduleInterruptPoint,
ScheduleType,
State,
SuspendedProcessInfo,
)

# plugin interface
Expand Down Expand Up @@ -1471,6 +1471,14 @@ def handle_schedule_interrupt_event(self, event: ScheduleInterruptEvent):
"""


class ConfigMixin:
@abstractmethod
def get_config(self, name):
"""
获取配置
"""


class EngineRuntimeInterface(
PluginManagerMixin,
EngineAPIHooksMixin,
Expand All @@ -1483,6 +1491,7 @@ class EngineRuntimeInterface(
DataMixin,
ExecutionHistoryMixin,
InterruptMixin,
ConfigMixin,
metaclass=ABCMeta,
):
@abstractmethod
Expand Down
15 changes: 8 additions & 7 deletions bamboo_engine/handlers/conditional_parallel_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

from pyparsing import ParseException

from bamboo_engine.utils.boolrule import BoolRule
from bamboo_engine.template.template import Template
from bamboo_engine.interrupt import ExecuteKeyPoint
from bamboo_engine import states, metrics
from bamboo_engine.eri import NodeType, ProcessInfo, ExecuteInterruptPoint
from bamboo_engine import metrics, states
from bamboo_engine.context import Context
from bamboo_engine.handler import register_handler, NodeHandler, ExecuteResult
from bamboo_engine.eri import ExecuteInterruptPoint, NodeType, ProcessInfo
from bamboo_engine.handler import ExecuteResult, NodeHandler, register_handler
from bamboo_engine.interrupt import ExecuteKeyPoint
from bamboo_engine.template.template import Template
from bamboo_engine.utils.constants import RuntimeSettings
from bamboo_engine.utils.string import transform_escape_char

logger = logging.getLogger("bamboo_engine")
Expand Down Expand Up @@ -109,7 +109,8 @@ def execute(
)

try:
result = BoolRule(resolved_evaluate).test()
expr_func = self.runtime.get_config(RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value)
result = expr_func(resolved_evaluate, hydrated_context)
logger.info(
"root_pipeline[%s] node(%s) %s test result: %s",
root_pipeline_id,
Expand Down
19 changes: 13 additions & 6 deletions bamboo_engine/handlers/exclusive_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

from pyparsing import ParseException

from bamboo_engine import states, metrics
from bamboo_engine import metrics, states
from bamboo_engine.context import Context
from bamboo_engine.eri import ExecuteInterruptPoint, NodeType, ProcessInfo
from bamboo_engine.handler import ExecuteResult, NodeHandler, register_handler
from bamboo_engine.template import Template
from bamboo_engine.handler import register_handler, NodeHandler, ExecuteResult
from bamboo_engine.utils.boolrule import BoolRule
from bamboo_engine.eri import NodeType, ProcessInfo, ExecuteInterruptPoint

from bamboo_engine.utils.constants import ExclusiveGatewayStrategy, RuntimeSettings
from bamboo_engine.utils.string import transform_escape_char

logger = logging.getLogger("bamboo_engine")
Expand Down Expand Up @@ -115,14 +114,22 @@ def execute(
hydrated_context,
)
try:
result = BoolRule(resolved_evaluate).test()
expr_func = self.runtime.get_config(RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value)
result = expr_func(resolved_evaluate, hydrated_context)
logger.info(
"root_pipeline[%s] node(%s) %s test result: %s",
root_pipeline_id,
self.node.id,
resolved_evaluate,
result,
)

strategy = self.runtime.get_config(RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY.value)
# 如果策略是命中第一个,并且result为true, 则直接结束循环
if strategy == ExclusiveGatewayStrategy.FIRST.value and result:
meet_conditions.append(c.name)
meet_targets.append(c.target_id)
break
except ParseException as e:
logger.exception(f"[exclusive_gateway] evaluation parse error: {e}")
return self._execute_fail(
Expand Down
25 changes: 23 additions & 2 deletions bamboo_engine/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,34 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from enum import Enum

from bamboo_engine.eri import ContextValueType


VAR_CONTEXT_MAPPING = {
"plain": ContextValueType.PLAIN,
"splice": ContextValueType.SPLICE,
"lazy": ContextValueType.COMPUTE,
}


class ExclusiveGatewayStrategy(Enum):
"""
网关命中策略
"""

# 唯一命中
ONLY = 1
# 优先命中第一个
FIRST = 2


class RuntimeSettings(Enum):
PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC"
PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = "PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY"


RUNTIME_ALLOWED_CONFIG = [
RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value,
RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY.value,
]
6 changes: 6 additions & 0 deletions bamboo_engine/utils/expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
from bamboo_engine.utils.boolrule import BoolRule


def default_expr_func(expr: str, context: dict) -> bool:
return BoolRule(expr).test()
64 changes: 64 additions & 0 deletions commitlint.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
const Configuration = {
/*
* Resolve and load @commitlint/config-conventional from node_modules.
* Referenced packages must be installed
*/
extends: [
'@commitlint/config-conventional'
],
/*
* Resolve and load conventional-changelog-atom from node_modules.
* Referenced packages must be installed
*/
formatter: '@commitlint/format',
/*
* Any rules defined here will override rules from @commitlint/config-conventional
*/
rules: {
'type-enum': [
2,
'always',
[
'feature',
'bugfix',
'minor',
'optimization',
'sprintfix',
'refactor',
'test',
'docs',
'merge',
],
]
},
/*
* Functions that return true if commitlint should ignore the given message.
*/
ignores: [
(commit) => commit === '',
(message) => message.includes('Merge'),
(message) => message.includes('merge')
],
/*
* Whether commitlint uses the default ignore rules.
*/
defaultIgnores: true,
/*
* Custom URL to show upon failure
*/
helpUrl:
'https://github.com/conventional-changelog/commitlint/#what-is-commitlint',
/*
* Custom prompt configs
*/
prompt: {
messages: {},
questions: {
type: {
description: 'please input type:',
},
},
},
};

module.exports = Configuration;
33 changes: 33 additions & 0 deletions runtime/bamboo-pipeline/pipeline/eri/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
"""

from django.apps import AppConfig
from django.conf import settings
from pipeline.exceptions import ConfigValidationError

from bamboo_engine.handlers import register
from bamboo_engine.utils.constants import ExclusiveGatewayStrategy


class ERIConfig(AppConfig):
Expand All @@ -24,3 +27,33 @@ def ready(self):
from .celery.tasks import execute, schedule # noqa

register()

# 校验 PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC 配置
if hasattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC"):

pipeline_exclusive_gateway_expr_func = getattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC", None)
if not callable(pipeline_exclusive_gateway_expr_func):
raise ConfigValidationError("config validate error, the expr func must be callable, please check it")

# 是否校验
pipeline_exclusive_gateway_expr_func_check = getattr(
settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC_CHECK", False
)
if not pipeline_exclusive_gateway_expr_func_check:
# 获取校验文本
pipeline_exclusive_gateway_expr_func_text = getattr(
settings, "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC_TEXT", "1==1"
)
check_result = pipeline_exclusive_gateway_expr_func(pipeline_exclusive_gateway_expr_func_text, {})
if not check_result:
raise ConfigValidationError("config validate error, the expr func return False")

if hasattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY"):
pipeline_exclusive_gateway_strategy = getattr(settings, "PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY")
if pipeline_exclusive_gateway_strategy not in [
ExclusiveGatewayStrategy.ONLY.value,
ExclusiveGatewayStrategy.FIRST.value,
]:
raise ConfigValidationError(
"config validate error, the pipeline_exclusive_gateway_strategy only support 1(ONLY), 2(FIRST)"
)
16 changes: 16 additions & 0 deletions runtime/bamboo-pipeline/pipeline/eri/imp/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
from django.conf import settings

from bamboo_engine.config import Settings
from bamboo_engine.utils.constants import RUNTIME_ALLOWED_CONFIG


class ConfigMixin:
def get_config(self, name):
if name not in RUNTIME_ALLOWED_CONFIG:
raise ValueError("unsupported pipeline config, name={}".format(name))

custom_config_value = getattr(settings, name, None)
if custom_config_value:
return custom_config_value
return getattr(Settings, name)
Loading

0 comments on commit ae818d0

Please sign in to comment.