Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-13201] Streams Definition Validator #6656

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2,607 changes: 2,607 additions & 0 deletions apidef/streams/bento/schema/bento-v1.2.0-supported-schema.json

Large diffs are not rendered by default.

109 changes: 109 additions & 0 deletions apidef/streams/bento/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package bento

import (
"embed"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"

"github.com/hashicorp/go-multierror"

"github.com/TykTechnologies/gojsonschema"
tykerrors "github.com/TykTechnologies/tyk/internal/errors"
)

type ConfigValidator interface {
Validate(document []byte) error
}

type ValidatorKind string

const (
DefaultBentoConfigSchemaName string = "bento-v1.2.0-supported-schema.json"
DefaultValidator ValidatorKind = "default-validator"
)

var (
schemaOnce sync.Once

bentoSchemas = map[ValidatorKind][]byte{}
)

//go:embed schema/*
var schemaDir embed.FS

func loadBentoSchemas() error {
load := func() error {
members, err := schemaDir.ReadDir("schema")
if err != nil {
return fmt.Errorf("listing Bento schemas failed %w", err)
}

bentoSchemas = make(map[ValidatorKind][]byte)
for _, member := range members {
if member.IsDir() {
continue
}

fileName := member.Name()
if !strings.HasSuffix(fileName, ".json") {
// It might be an AsyncAPI schema in YAML format, it is not supported yet.
continue
}

// Load default Bento configuration schema
if fileName == DefaultBentoConfigSchemaName {
var data []byte
data, err = schemaDir.ReadFile(filepath.Join("schema/", DefaultBentoConfigSchemaName))
if err != nil {
return err
}
bentoSchemas[DefaultValidator] = data
}
}
return nil
}

var err error
schemaOnce.Do(func() {
err = load()
})
return err
}

type DefaultConfigValidator struct {
schemaLoader gojsonschema.JSONLoader
}

func NewDefaultConfigValidator() (*DefaultConfigValidator, error) {
err := loadBentoSchemas() // loads the schemas only one time
if err != nil {
return nil, err
}

schema := bentoSchemas[DefaultValidator]
return &DefaultConfigValidator{
schemaLoader: gojsonschema.NewBytesLoader(schema),
}, nil
}

func (v *DefaultConfigValidator) Validate(document []byte) error {
documentLoader := gojsonschema.NewBytesLoader(document)
result, err := gojsonschema.Validate(v.schemaLoader, documentLoader)
if err != nil {
return err
}
if result.Valid() {
return nil
}

combinedErr := &multierror.Error{}
combinedErr.ErrorFormat = tykerrors.Formatter
validationErrs := result.Errors()
for _, validationErr := range validationErrs {
combinedErr = multierror.Append(combinedErr, errors.New(validationErr.String()))
}
return combinedErr.ErrorOrNil()
}
88 changes: 88 additions & 0 deletions apidef/streams/bento/validator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package bento

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidateBentoConfiguration(t *testing.T) {
validator, err := NewDefaultConfigValidator()
require.NoError(t, err)

t.Run("Valid Bento Configuration", func(t *testing.T) {
validDocument := []byte(`{
"input": {
"label": "",
"kafka": {
"addresses": [],
"topics": [],
"target_version": "2.1.0",
"consumer_group": "",
"checkpoint_limit": 1024,
"auto_replay_nacks": true
}
}
}`)
err = validator.Validate(validDocument)
require.NoError(t, err)
})

t.Run("Invalid Bento Configuration", func(t *testing.T) {
invalidDocument := []byte(`{
"input": {
"label": "",
"kafka": {
"addresses": [],
"topics": [],
"target_version": "2.1.0",
"consumer_group": "",
"checkpoint_limit": 1024,
"auto_replay_nacks": "some-string"
}
}
}`)

err = validator.Validate(invalidDocument)
require.ErrorContains(t, err, "input.kafka.auto_replay_nacks: Invalid type. Expected: boolean, given: string")
})

t.Run("Allow Additional Properties", func(t *testing.T) {
validDocumentWithAdditionalProperties := []byte(`{
"input": {
"label": "",
"kafka": {
"addresses": [],
"topics": [],
"target_version": "2.1.0",
"consumer_group": "",
"checkpoint_limit": 1024,
"auto_replay_nacks": true
},
"additional": {
"configuration": true
}
},
"output": {
"label": "",
"drop_on": {
"error": false,
"error_patterns": [],
"back_pressure": "30s",
"output": null
},
"aws_sns": {
"topic_arn": "",
"message_group_id": "",
"message_deduplication_id": "",
"max_in_flight": 64,
"metadata": {
"exclude_prefixes": []
}
}
}
}`)
err = validator.Validate(validDocumentWithAdditionalProperties)
require.NoError(t, err)
})
}
17 changes: 17 additions & 0 deletions apidef/streams/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package streams

import (
"github.com/TykTechnologies/tyk/apidef/oas"
)

// XTykStreaming represents the structure for Tyk streaming configurations.
type XTykStreaming struct {
// Info contains the main metadata for the API definition.
Info oas.Info `bson:"info" json:"info"` // required
// Server contains the configurations related to the server.
Server oas.Server `bson:"server" json:"server"` // required
// Streams contains the configurations related to Tyk Streams
Streams map[string]interface{} `bson:"streams" json:"streams"` // required
// Middleware contains the configurations related to the Tyk middleware.
Middleware *oas.Middleware `bson:"middleware,omitempty" json:"middleware,omitempty"`
}
Loading
Loading