diff --git a/runtime/bamboo-pipeline/pipeline/conf/default_settings.py b/runtime/bamboo-pipeline/pipeline/conf/default_settings.py index da19ad34..ecd25402 100644 --- a/runtime/bamboo-pipeline/pipeline/conf/default_settings.py +++ b/runtime/bamboo-pipeline/pipeline/conf/default_settings.py @@ -30,11 +30,15 @@ PIPELINE_INSTANCE_CONTEXT = getattr(settings, "PIPELINE_INSTANCE_CONTEXT", "") PIPELINE_ENGINE_ADAPTER_API = getattr( - settings, "PIPELINE_ENGINE_ADAPTER_API", "pipeline.service.pipeline_engine_adapter.adapter_api", + settings, + "PIPELINE_ENGINE_ADAPTER_API", + "pipeline.service.pipeline_engine_adapter.adapter_api", ) PIPELINE_DATA_BACKEND = getattr( - settings, "PIPELINE_DATA_BACKEND", "pipeline.engine.core.data.mysql_backend.MySQLDataBackend", + settings, + "PIPELINE_DATA_BACKEND", + "pipeline.engine.core.data.mysql_backend.MySQLDataBackend", ) PIPELINE_DATA_CANDIDATE_BACKEND = getattr(settings, "PIPELINE_DATA_CANDIDATE_BACKEND", None) PIPELINE_DATA_BACKEND_AUTO_EXPIRE = getattr(settings, "PIPELINE_DATA_BACKEND_AUTO_EXPIRE", False) @@ -43,7 +47,9 @@ ) PIPELINE_END_HANDLER = getattr( - settings, "PIPELINE_END_HANDLER", "pipeline.engine.signals.handlers.pipeline_end_handler", + settings, + "PIPELINE_END_HANDLER", + "pipeline.engine.signals.handlers.pipeline_end_handler", ) PIPELINE_WORKER_STATUS_CACHE_EXPIRES = getattr(settings, "PIPELINE_WORKER_STATUS_CACHE_EXPIRES", 30) PIPELINE_RERUN_MAX_TIMES = getattr(settings, "PIPELINE_RERUN_MAX_TIMES", 0) @@ -95,3 +101,6 @@ # 开发者自定义插件和变量异常类 PLUGIN_SPECIFIC_EXCEPTIONS = getattr(settings, "PLUGIN_SPECIFIC_EXCEPTIONS", ()) VARIABLE_SPECIFIC_EXCEPTIONS = getattr(settings, "VARIABLE_SPECIFIC_EXCEPTIONS", ()) + +# 是否开启输入输出校验 +PLUGIN_INPUT_VALIDATE_ENABLED = getattr(settings, "PLUGIN_INPUT_VALIDATE_ENABLED", False) diff --git a/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py b/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py index 2a1fa53d..30bac92c 100644 --- a/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py +++ b/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py @@ -13,10 +13,10 @@ from abc import ABCMeta, abstractmethod from copy import deepcopy - from django.utils.translation import ugettext_lazy as _ from pipeline.conf import settings +from pipeline.exceptions import ValidationError as PipelineInputValidationError from pipeline.core.flow.activity.base import Activity from pipeline.core.flow.io import BooleanItemSchema, InputItem, IntItemSchema, OutputItem from pipeline.utils.utils import convert_bytes_to_str @@ -71,6 +71,20 @@ def execute(self, data, parent_data): # get params from data pass + def validate_input(self, data): + errors = {} + for item in self.inputs_format(): + if item.required and item.key not in data: + errors[item.key] = "this field is required" + continue + try: + item.validate(data.get(item.key)) + except Exception as e: + errors[item.key] = str(e) + + if errors: + raise PipelineInputValidationError(errors) + def outputs_format(self): return [] @@ -265,13 +279,13 @@ def next(self): class DefaultIntervalGenerator(AbstractIntervalGenerator): def next(self): super(DefaultIntervalGenerator, self).next() - return self.count ** 2 + return self.count**2 class SquareIntervalGenerator(AbstractIntervalGenerator): def next(self): super(SquareIntervalGenerator, self).next() - return self.count ** 2 + return self.count**2 class NullIntervalGenerator(AbstractIntervalGenerator): diff --git a/runtime/bamboo-pipeline/pipeline/core/flow/io.py b/runtime/bamboo-pipeline/pipeline/core/flow/io.py index efc7f6f3..c02e24c6 100644 --- a/runtime/bamboo-pipeline/pipeline/core/flow/io.py +++ b/runtime/bamboo-pipeline/pipeline/core/flow/io.py @@ -12,11 +12,22 @@ """ import abc + try: from collections.abc import Mapping except ImportError: from collections import Mapping +from pipeline.core.flow.validator import ( + StringValidator, + IntValidator, + FloatValidator, + BooleanValidator, + ArrayValidator, + ObjectValidator, + DefaultValidator, +) + class DataItem(object, metaclass=abc.ABCMeta): def __init__(self, name, key, type, schema=None): @@ -39,6 +50,11 @@ def __init__(self, required=True, *args, **kwargs): self.required = required super(InputItem, self).__init__(*args, **kwargs) + def validate(self, value): + if self.schema is None: + return + self.schema.validate(value) + def as_dict(self): base = super(InputItem, self).as_dict() base["required"] = self.required @@ -50,10 +66,17 @@ class OutputItem(DataItem): class ItemSchema(object, metaclass=abc.ABCMeta): - def __init__(self, description, enum=None): + validator = DefaultValidator + + def __init__(self, description, enum=None, validator_cls=None): self.type = self._type() self.description = description self.enum = enum or [] + if validator_cls is not None: + self.validator = validator_cls + + def validate(self, value): + self.validator(self).validate(value) def as_dict(self): return {"type": self.type, "description": self.description, "enum": self.enum} @@ -68,30 +91,40 @@ class SimpleItemSchema(ItemSchema, metaclass=abc.ABCMeta): class IntItemSchema(SimpleItemSchema): + validator = IntValidator + @classmethod def _type(cls): return "int" class StringItemSchema(SimpleItemSchema): + validator = StringValidator + @classmethod def _type(cls): return "string" class FloatItemSchema(SimpleItemSchema): + validator = FloatValidator + @classmethod def _type(cls): return "float" class BooleanItemSchema(SimpleItemSchema): + validator = BooleanValidator + @classmethod def _type(cls): return "boolean" class ArrayItemSchema(ItemSchema): + validator = ArrayValidator + def __init__(self, item_schema, *args, **kwargs): if not isinstance(item_schema, ItemSchema): raise TypeError("item_schema of ArrayItemSchema must be subclass of ItemSchema") @@ -109,6 +142,8 @@ def _type(cls): class ObjectItemSchema(ItemSchema): + validator = ObjectValidator + def __init__(self, property_schemas, *args, **kwargs): if not isinstance(property_schemas, Mapping): raise TypeError("property_schemas of ObjectItemSchema must be Mapping type") diff --git a/runtime/bamboo-pipeline/pipeline/core/flow/validator.py b/runtime/bamboo-pipeline/pipeline/core/flow/validator.py new file mode 100644 index 00000000..80f959dc --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/core/flow/validator.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +import abc + +from pipeline.exceptions import ValidationError + + +class BaseValidator(metaclass=abc.ABCMeta): + type = None + + def __init__(self, schema): + self.schema = schema + + def validate(self, value): + if not isinstance(value, self.type): + raise TypeError("validator error,value: {} is not {} type".format(value, self.type)) + if self.schema.enum and value not in self.schema.enum: + raise ValidationError("value: {} not in {}".format(value, self.schema.enum)) + + +class DefaultValidator(BaseValidator): + def validate(self, value): + pass + + +class StringValidator(BaseValidator): + type = str + + +class IntValidator(BaseValidator): + type = int + + +class BooleanValidator(BaseValidator): + type = bool + + +class FloatValidator(BaseValidator): + type = float + + +class ObjectValidator(BaseValidator): + type = dict + + def validate(self, value): + if not isinstance(value, dict): + raise TypeError("validate error,value must be {}".format(self.type)) + if self.schema.property_schemas: + if set(value.keys()) != self.schema.property_schemas.keys(): + # 判断字典的key是否和预期保持一致 + raise ValidationError( + "validate error,it must have this keys:{}".format(self.schema.property_schemas.keys()) + ) + for key, v in value.items(): + schema_cls = self.schema.property_schemas.get(key) + value_type = schema_cls.as_dict()["type"] + validator = VALIDATOR_MAP.get(value_type)(schema_cls) + validator.validate(v) + + +class ArrayValidator(BaseValidator): + type = list + + def validate(self, value): + if not isinstance(value, list): + raise TypeError("validate error,value must be {}".format(self.type)) + + value_type = self.schema.item_schema.as_dict()["type"] + if value_type in ["object", "array"]: + self.schema = self.schema.item_schema + + validator = VALIDATOR_MAP.get(value_type)(self.schema) + + for v in value: + validator.validate(v) + + +VALIDATOR_MAP = { + "string": StringValidator, + "int": IntValidator, + "float": FloatValidator, + "boolean": BooleanValidator, + "array": ArrayValidator, + "object": ObjectValidator, +} diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py b/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py index 749fbd71..0a15a3f1 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py @@ -13,7 +13,10 @@ from typing import Optional +from pipeline.conf import settings +from pipeline.component_framework.library import ComponentLibrary from pipeline.eri.models import LogEntry +from pipeline.exceptions import ValidationError class HooksMixin: @@ -32,6 +35,26 @@ def pre_prepare_run_pipeline( :param subprocess_context 子流程预置流程上下文 :type subprocess_context: dict """ + if not settings.PLUGIN_INPUT_VALIDATE_ENABLED: + return + + errors = {} + + for activity_id, activity in pipeline["activities"].items(): + code = activity["component"].get("code") + inputs = activity["component"].get("inputs", {}) + version = activity["component"].get("version", "legacy") + data = {key: value["value"] for key, value in inputs.items()} + component_class = ComponentLibrary.get_component_class(component_code=code, version=version) + if not component_class: + raise ValidationError("not fond component by code={}, version={}".format(code, version)) + try: + component_class.bound_service().validate_input(data) + except Exception as e: + errors.setdefault("{}({})".format(code, activity_id), []).append(str(e)) + + if errors: + raise ValidationError(errors) def post_prepare_run_pipeline( self, pipeline: dict, root_pipeline_data: dict, root_pipeline_context: dict, subprocess_context: dict, **options diff --git a/runtime/bamboo-pipeline/pipeline/exceptions.py b/runtime/bamboo-pipeline/pipeline/exceptions.py index 932b8c18..abe31285 100644 --- a/runtime/bamboo-pipeline/pipeline/exceptions.py +++ b/runtime/bamboo-pipeline/pipeline/exceptions.py @@ -193,3 +193,10 @@ class InsufficientVariableError(ContextError): # class InvalidCrontabException(PipelineException): pass + + +# +# pipline input exception +# +class ValidationError(PipelineException): + pass diff --git a/runtime/bamboo-pipeline/pipeline/tests/core/flow/io/test_vaidate.py b/runtime/bamboo-pipeline/pipeline/tests/core/flow/io/test_vaidate.py new file mode 100644 index 00000000..b5a72304 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/tests/core/flow/io/test_vaidate.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- + +from django.test import TestCase +from pipeline.core.flow.io import ( + InputItem, + StringItemSchema, + IntItemSchema, + FloatItemSchema, + BooleanItemSchema, + ArrayItemSchema, + ObjectItemSchema, +) +from pipeline.exceptions import ValidationError + + +class InputItemValidateTestCase(TestCase): + def setUp(self): + self.description = "a simple item" + self.enum = ["1", "2", "3"] + + def test_str_validate(self): + item = InputItem( + name="input item", + key="test", + type="string", + schema=StringItemSchema(description="string schema", enum=["1", "2", "3"]), + ) + self.assertRaises(TypeError, item.validate, 1) + self.assertRaises(ValidationError, item.validate, "5") + item.validate("1") + + def test_int_validate(self): + item = InputItem( + name="input item", key="test", type="int", schema=IntItemSchema(description="int schema", enum=[1, 2]) + ) + + self.assertRaises(TypeError, item.validate, "1") + self.assertRaises(ValidationError, item.validate, 5) + item.validate(1) + + def test_float_validate(self): + item = InputItem( + name="input item", + key="test", + type="float", + schema=FloatItemSchema(description="float schema", enum=[1.1, 2.2]), + ) + + self.assertRaises(TypeError, item.validate, "1") + self.assertRaises(ValidationError, item.validate, 2.3) + item.validate(1.1) + + def test_boolean_validate(self): + item = InputItem( + name="input item", + key="test", + type="boolean", + schema=BooleanItemSchema(description="boolean schema", enum=[True]), + ) + + self.assertRaises(TypeError, item.validate, "1") + self.assertRaises(ValidationError, item.validate, False) + item.validate(True) + + def test_array_validate(self): + item = InputItem( + name="input item", + key="test", + type="array", + schema=ArrayItemSchema( + description="array schema", + item_schema=StringItemSchema(description="array schema", enum=["1", "2", "3"]), + ), + ) + + self.assertRaises(TypeError, item.validate, "1") + self.assertRaises(TypeError, item.validate, [1, 2, 3]) + item.validate(["1"]) + + def test_object_validate(self): + item = InputItem( + name="input item", + key="test", + type="object", + schema=ObjectItemSchema( + description="boolean schema", + property_schemas={ + "a": StringItemSchema(description="string schema"), + "b": IntItemSchema(description="int schema"), + }, + ), + ) + + self.assertRaises(TypeError, item.validate, "1") + self.assertRaises(TypeError, item.validate, {"a": 1, "b": 1}) + self.assertRaises(ValidationError, item.validate, {"a": "1", "b": 1, "c": 1}) + item.validate({"a": "1", "b": 1}) + + def test_mix_validate(self): + item = InputItem( + name="用户详情", + key="userinfo", + type="object", + required=True, + schema=ObjectItemSchema( + description="用户基本信息", + property_schemas={ + "username": StringItemSchema(description="用户名"), + "phone": IntItemSchema(description="手机号"), + "other": ArrayItemSchema( + description="用户其他信息", + item_schema=ObjectItemSchema( + description="用户其他信息", + property_schemas={ + "gender": StringItemSchema(description="性别", enum=["男", "女"]), + "age": IntItemSchema(description="年龄"), + }, + ), + ), + }, + ), + ) + + data = {"username": "test", "phone": 123456, "other": [{"gender": "未知", "age": 18}]} + + self.assertRaises(ValidationError, item.validate, data) + + data["other"][0]["gender"] = "男" + + item.validate(data)