Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: 插件支持输入校验 #154

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions runtime/bamboo-pipeline/pipeline/conf/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@
PIPELINE_INSTANCE_CONTEXT = getattr(settings, "PIPELINE_INSTANCE_CONTEXT", "")

PIPELINE_ENGINE_ADAPTER_API = getattr(
settings, "PIPELINE_ENGINE_ADAPTER_API", "pipeline.service.pipeline_engine_adapter.adapter_api",
settings,
"PIPELINE_ENGINE_ADAPTER_API",
"pipeline.service.pipeline_engine_adapter.adapter_api",
)

PIPELINE_DATA_BACKEND = getattr(
settings, "PIPELINE_DATA_BACKEND", "pipeline.engine.core.data.mysql_backend.MySQLDataBackend",
settings,
"PIPELINE_DATA_BACKEND",
"pipeline.engine.core.data.mysql_backend.MySQLDataBackend",
)
PIPELINE_DATA_CANDIDATE_BACKEND = getattr(settings, "PIPELINE_DATA_CANDIDATE_BACKEND", None)
PIPELINE_DATA_BACKEND_AUTO_EXPIRE = getattr(settings, "PIPELINE_DATA_BACKEND_AUTO_EXPIRE", False)
Expand All @@ -43,7 +47,9 @@
)

PIPELINE_END_HANDLER = getattr(
settings, "PIPELINE_END_HANDLER", "pipeline.engine.signals.handlers.pipeline_end_handler",
settings,
"PIPELINE_END_HANDLER",
"pipeline.engine.signals.handlers.pipeline_end_handler",
)
PIPELINE_WORKER_STATUS_CACHE_EXPIRES = getattr(settings, "PIPELINE_WORKER_STATUS_CACHE_EXPIRES", 30)
PIPELINE_RERUN_MAX_TIMES = getattr(settings, "PIPELINE_RERUN_MAX_TIMES", 0)
Expand Down Expand Up @@ -95,3 +101,6 @@
# 开发者自定义插件和变量异常类
PLUGIN_SPECIFIC_EXCEPTIONS = getattr(settings, "PLUGIN_SPECIFIC_EXCEPTIONS", ())
VARIABLE_SPECIFIC_EXCEPTIONS = getattr(settings, "VARIABLE_SPECIFIC_EXCEPTIONS", ())

# 是否开启输入输出校验
PLUGIN_INPUT_VALIDATE_ENABLED = getattr(settings, "PLUGIN_INPUT_VALIDATE_ENABLED", False)
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

from abc import ABCMeta, abstractmethod
from copy import deepcopy

from django.utils.translation import ugettext_lazy as _

from pipeline.conf import settings
from pipeline.exceptions import ValidationError as PipelineInputValidationError
from pipeline.core.flow.activity.base import Activity
from pipeline.core.flow.io import BooleanItemSchema, InputItem, IntItemSchema, OutputItem
from pipeline.utils.utils import convert_bytes_to_str
Expand Down Expand Up @@ -71,6 +71,20 @@ def execute(self, data, parent_data):
# get params from data
pass

def validate_input(self, data):
errors = {}
for item in self.inputs_format():
if item.required and item.key not in data:
errors[item.key] = "this field is required"
continue
try:
item.validate(data.get(item.key))
except Exception as e:
errors[item.key] = str(e)

if errors:
raise PipelineInputValidationError(errors)

def outputs_format(self):
return []

Expand Down Expand Up @@ -265,13 +279,13 @@ def next(self):
class DefaultIntervalGenerator(AbstractIntervalGenerator):
def next(self):
super(DefaultIntervalGenerator, self).next()
return self.count ** 2
return self.count**2


class SquareIntervalGenerator(AbstractIntervalGenerator):
def next(self):
super(SquareIntervalGenerator, self).next()
return self.count ** 2
return self.count**2


class NullIntervalGenerator(AbstractIntervalGenerator):
Expand Down
37 changes: 36 additions & 1 deletion runtime/bamboo-pipeline/pipeline/core/flow/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
"""

import abc

try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping

from pipeline.core.flow.validator import (
StringValidator,
IntValidator,
FloatValidator,
BooleanValidator,
ArrayValidator,
ObjectValidator,
DefaultValidator,
)


class DataItem(object, metaclass=abc.ABCMeta):
def __init__(self, name, key, type, schema=None):
Expand All @@ -39,6 +50,11 @@ def __init__(self, required=True, *args, **kwargs):
self.required = required
super(InputItem, self).__init__(*args, **kwargs)

def validate(self, value):
if self.schema is None:
return
self.schema.validate(value)

def as_dict(self):
base = super(InputItem, self).as_dict()
base["required"] = self.required
Expand All @@ -50,10 +66,17 @@ class OutputItem(DataItem):


class ItemSchema(object, metaclass=abc.ABCMeta):
def __init__(self, description, enum=None):
validator = DefaultValidator

def __init__(self, description, enum=None, validator_cls=None):
self.type = self._type()
self.description = description
self.enum = enum or []
if validator_cls is not None:
self.validator = validator_cls

def validate(self, value):
self.validator(self).validate(value)

def as_dict(self):
return {"type": self.type, "description": self.description, "enum": self.enum}
Expand All @@ -68,30 +91,40 @@ class SimpleItemSchema(ItemSchema, metaclass=abc.ABCMeta):


class IntItemSchema(SimpleItemSchema):
validator = IntValidator

@classmethod
def _type(cls):
return "int"


class StringItemSchema(SimpleItemSchema):
validator = StringValidator

@classmethod
def _type(cls):
return "string"


class FloatItemSchema(SimpleItemSchema):
validator = FloatValidator

@classmethod
def _type(cls):
return "float"


class BooleanItemSchema(SimpleItemSchema):
validator = BooleanValidator

@classmethod
def _type(cls):
return "boolean"


class ArrayItemSchema(ItemSchema):
validator = ArrayValidator

def __init__(self, item_schema, *args, **kwargs):
if not isinstance(item_schema, ItemSchema):
raise TypeError("item_schema of ArrayItemSchema must be subclass of ItemSchema")
Expand All @@ -109,6 +142,8 @@ def _type(cls):


class ObjectItemSchema(ItemSchema):
validator = ObjectValidator

def __init__(self, property_schemas, *args, **kwargs):
if not isinstance(property_schemas, Mapping):
raise TypeError("property_schemas of ObjectItemSchema must be Mapping type")
Expand Down
84 changes: 84 additions & 0 deletions runtime/bamboo-pipeline/pipeline/core/flow/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
import abc

from pipeline.exceptions import ValidationError


class BaseValidator(metaclass=abc.ABCMeta):
type = None

def __init__(self, schema):
self.schema = schema

def validate(self, value):
if not isinstance(value, self.type):
raise TypeError("validator error,value: {} is not {} type".format(value, self.type))
if self.schema.enum and value not in self.schema.enum:
raise ValidationError("value: {} not in {}".format(value, self.schema.enum))


class DefaultValidator(BaseValidator):
def validate(self, value):
pass


class StringValidator(BaseValidator):
type = str


class IntValidator(BaseValidator):
type = int


class BooleanValidator(BaseValidator):
type = bool


class FloatValidator(BaseValidator):
type = float


class ObjectValidator(BaseValidator):
type = dict

def validate(self, value):
if not isinstance(value, dict):
raise TypeError("validate error,value must be {}".format(self.type))
if self.schema.property_schemas:
if set(value.keys()) != self.schema.property_schemas.keys():
# 判断字典的key是否和预期保持一致
raise ValidationError(
"validate error,it must have this keys:{}".format(self.schema.property_schemas.keys())
)
for key, v in value.items():
schema_cls = self.schema.property_schemas.get(key)
value_type = schema_cls.as_dict()["type"]
validator = VALIDATOR_MAP.get(value_type)(schema_cls)
validator.validate(v)


class ArrayValidator(BaseValidator):
type = list

def validate(self, value):
if not isinstance(value, list):
raise TypeError("validate error,value must be {}".format(self.type))

value_type = self.schema.item_schema.as_dict()["type"]
if value_type in ["object", "array"]:
self.schema = self.schema.item_schema

validator = VALIDATOR_MAP.get(value_type)(self.schema)

for v in value:
validator.validate(v)


VALIDATOR_MAP = {
"string": StringValidator,
"int": IntValidator,
"float": FloatValidator,
"boolean": BooleanValidator,
"array": ArrayValidator,
"object": ObjectValidator,
}
23 changes: 23 additions & 0 deletions runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

from typing import Optional

from pipeline.conf import settings
from pipeline.component_framework.library import ComponentLibrary
from pipeline.eri.models import LogEntry
from pipeline.exceptions import ValidationError


class HooksMixin:
Expand All @@ -32,6 +35,26 @@ def pre_prepare_run_pipeline(
:param subprocess_context 子流程预置流程上下文
:type subprocess_context: dict
"""
if not settings.PLUGIN_INPUT_VALIDATE_ENABLED:
return

errors = {}

for activity_id, activity in pipeline["activities"].items():
code = activity["component"].get("code")
inputs = activity["component"].get("inputs", {})
version = activity["component"].get("version", "legacy")
data = {key: value["value"] for key, value in inputs.items()}
component_class = ComponentLibrary.get_component_class(component_code=code, version=version)
if not component_class:
raise ValidationError("not fond component by code={}, version={}".format(code, version))
try:
component_class.bound_service().validate_input(data)
except Exception as e:
errors.setdefault("{}({})".format(code, activity_id), []).append(str(e))

if errors:
raise ValidationError(errors)

def post_prepare_run_pipeline(
self, pipeline: dict, root_pipeline_data: dict, root_pipeline_context: dict, subprocess_context: dict, **options
Expand Down
7 changes: 7 additions & 0 deletions runtime/bamboo-pipeline/pipeline/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,10 @@ class InsufficientVariableError(ContextError):
#
class InvalidCrontabException(PipelineException):
pass


#
# pipline input exception
#
class ValidationError(PipelineException):
pass
Loading
Loading