Skip to content

Commit

Permalink
convert dag.Step struct to protocol buffers (#449)
Browse files Browse the repository at this point in the history
* introduce protocol buffers

* fix proto

* fix dirname

* stepをprotobufに移行

* convert step from gostruct to protobuf

* Merge branch 'feature/protocol-buffers' of https://github.com/garunitule/dagu into feature/protocol-buffers

* validate init value

* change file name

* modify test.yaml

* delete testdag

* delete unused files

* delete unused file

* refactor convertPbAnyToInterface

* change file name

* fix ci issue

---------

Co-authored-by: yohamta <[email protected]>
  • Loading branch information
garunitule and yohamta authored Aug 10, 2023
1 parent 4bf74aa commit c09deaf
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 38 deletions.
14 changes: 11 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: 1.19.x


- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Installing protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
- name: Set up Nodejs
uses: actions/setup-node@v3
with:
node-version: 16
node-version: 16

- name: Set up yarn
run: npm install --global yarn
Expand Down Expand Up @@ -42,7 +50,7 @@ jobs:
${{ runner.os }}-go-
- name: Build
run: |
mkdir ./bin && go build -o ./bin/dagu .
mkdir ./bin && protoc -I=./ --go_out=./internal ./internal/proto/*.proto && go build -o ./bin/dagu .
- name: Test
run: |
go test -v -coverprofile="coverage.txt" -covermode=atomic ./...
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bin/*
dist/
internal/admin/handlers/web/assets/fonts/*
internal/admin/handlers/web/assets/js/*
internal/**/*.pb.go

# NVM
.nvmrc
Expand Down
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
.PHONY: build server scheduler test

### Variables ###
SRC_DIR=./
DST_DIR=$(SRC_DIR)/internal
BUILD_VERSION=$(shell date +'%y%m%d%H%M%S')
LDFLAGS=-X 'main.version=$(BUILD_VERSION)'

Expand All @@ -9,6 +11,11 @@ VERSION=
DOCKER_CMD := docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 --build-arg VERSION=$(VERSION) --push --no-cache

### Commands ###
gen-pb:
protoc -I=$(SRC_DIR) --go_out=$(DST_DIR) $(SRC_DIR)/internal/proto/*.proto

build-bin:
go build -ldflags="$(LDFLAGS)" -o ./bin/dagu .

server:
go build -ldflags="$(LDFLAGS)" -o ./bin/dagu .
Expand All @@ -21,16 +28,13 @@ scheduler: build-dir
build-dir:
@mkdir -p ./bin

build: build-admin build-dir build-bin
build: build-admin build-dir gen-pb build-bin

build-admin:
@cd admin; \
yarn && yarn build
@cp admin/dist/bundle.js ./internal/admin/handlers/web/assets/js/

build-bin:
@go build -ldflags="$(LDFLAGS)" -o ./bin/dagu .

test:
@go test -v ./...

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gotest.tools/v3 v3.4.0 // indirect
)
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -543,6 +544,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
Expand Down
42 changes: 31 additions & 11 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/yohamta/dagu/internal/logger"
"github.com/yohamta/dagu/internal/mailer"
"github.com/yohamta/dagu/internal/models"
"github.com/yohamta/dagu/internal/pb"
"github.com/yohamta/dagu/internal/reporter"
"github.com/yohamta/dagu/internal/scheduler"
"github.com/yohamta/dagu/internal/sock"
Expand Down Expand Up @@ -162,18 +163,37 @@ func (a *Agent) signal(sig os.Signal, allowOverride bool) {

func (a *Agent) init() {
logDir := path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_"))
config := &scheduler.Config{
LogDir: logDir,
MaxActiveRuns: a.DAG.MaxActiveRuns,
Delay: a.DAG.Delay,
Dry: a.Dry,
RequestId: a.requestId,
}

if a.DAG.HandlerOn.Exit != nil {
onExit, _ := pb.ToPbStep(a.DAG.HandlerOn.Exit)
config.OnExit = onExit
}

if a.DAG.HandlerOn.Success != nil {
onSuccess, _ := pb.ToPbStep(a.DAG.HandlerOn.Success)
config.OnSuccess = onSuccess
}

if a.DAG.HandlerOn.Failure != nil {
onFailure, _ := pb.ToPbStep(a.DAG.HandlerOn.Failure)
config.OnFailure = onFailure
}

if a.DAG.HandlerOn.Cancel != nil {
onCancel, _ := pb.ToPbStep(a.DAG.HandlerOn.Cancel)
config.OnCancel = onCancel
}

a.scheduler = &scheduler.Scheduler{
Config: &scheduler.Config{
LogDir: logDir,
MaxActiveRuns: a.DAG.MaxActiveRuns,
Delay: a.DAG.Delay,
Dry: a.Dry,
OnExit: a.DAG.HandlerOn.Exit,
OnSuccess: a.DAG.HandlerOn.Success,
OnFailure: a.DAG.HandlerOn.Failure,
OnCancel: a.DAG.HandlerOn.Cancel,
RequestId: a.requestId,
}}
Config: config,
}
a.reporter = &reporter.Reporter{
Config: &reporter.Config{
Mailer: &mailer.Mailer{
Expand Down
210 changes: 210 additions & 0 deletions internal/pb/step_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package pb

import (
"github.com/yohamta/dagu/internal/dag"
// "google.golang.org/protobuf/encoding/protojson"
"fmt"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
durationpb "google.golang.org/protobuf/types/known/durationpb"
structpb "google.golang.org/protobuf/types/known/structpb"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
)

func ToDagStep(pbStep *Step) (*dag.Step, error) {
if pbStep == nil {
return nil, fmt.Errorf("pbStep must not be nil")
}
dagStep := &dag.Step{
Name: pbStep.Name,
Description: pbStep.Description,
Variables: pbStep.Variables,
Dir: pbStep.Dir,
CmdWithArgs: pbStep.CmdWithArgs,
Command: pbStep.Command,
Script: pbStep.Script,
Stdout: pbStep.Stdout,
Stderr: pbStep.Stderr,
Output: pbStep.Output,
Args: pbStep.Args,
Depends: pbStep.Depends,
MailOnError: pbStep.MailOnError,
SignalOnStop: pbStep.SignalOnStop,
}

if pbStep.ExecutorConfig != nil {
config := make(map[string]interface{}, len(pbStep.ExecutorConfig.Config))
for k, v := range pbStep.ExecutorConfig.Config {
vInterface, err := convertPbAnyToInterface(v)
if err != nil {
return nil, err
}
config[k] = vInterface
}

dagStep.ExecutorConfig = dag.ExecutorConfig{
Type: pbStep.ExecutorConfig.Type,
Config: config,
}
}

if pbStep.ContinueOn != nil {
dagStep.ContinueOn = dag.ContinueOn{
Failure: pbStep.ContinueOn.Failure,
Skipped: pbStep.ContinueOn.Skipped,
}
}

if pbStep.RetryPolicy != nil {
dagStep.RetryPolicy = &dag.RetryPolicy{
Limit: int(pbStep.RetryPolicy.Limit),
Interval: pbStep.RetryPolicy.Interval.AsDuration(),
}
}

if pbStep.RepeatPolicy != nil {
dagStep.RepeatPolicy = dag.RepeatPolicy{
Repeat: pbStep.RepeatPolicy.Repeat,
Interval: pbStep.RepeatPolicy.Interval.AsDuration(),
}
}

if pbStep.Preconditions != nil {
conditions := make([]*dag.Condition, len(pbStep.Preconditions))
for i, c := range pbStep.Preconditions {
conditions[i] = &dag.Condition{
Condition: c.Condition,
Expected: c.Expected,
}
}
dagStep.Preconditions = conditions
}

return dagStep, nil
}

func ToPbStep(dagStep *dag.Step) (*Step, error) {
if dagStep == nil {
return nil, fmt.Errorf("dagStep must not be nil")
}
step := &Step{
Name: dagStep.Name,
Description: dagStep.Description,
Variables: dagStep.Variables,
Dir: dagStep.Dir,
CmdWithArgs: dagStep.CmdWithArgs,
Command: dagStep.Command,
Script: dagStep.Script,
Stdout: dagStep.Stdout,
Stderr: dagStep.Stderr,
Output: dagStep.Output,
Args: dagStep.Args,
Depends: dagStep.Depends,
MailOnError: dagStep.MailOnError,
SignalOnStop: dagStep.SignalOnStop,
}

if &dagStep.ExecutorConfig != nil {
config := make(map[string]*anypb.Any, len(dagStep.ExecutorConfig.Config))
for k, v := range dagStep.ExecutorConfig.Config {
pMsg, err := convertToProtoMessage(v)
if err != nil {
return nil, err
}

any, err := anypb.New(pMsg)
if err != nil {
return nil, err
}

config[k] = any
}
step.ExecutorConfig = &ExecutorConfig{
Type: dagStep.ExecutorConfig.Type,
Config: config,
}
}

if &dagStep.ContinueOn != nil {
step.ContinueOn = &ContinueOn{
Failure: dagStep.ContinueOn.Failure,
Skipped: dagStep.ContinueOn.Skipped,
}
}

if dagStep.RetryPolicy != nil {
step.RetryPolicy = &RetryPolicy{
Limit: int32(dagStep.RetryPolicy.Limit),
Interval: durationpb.New(dagStep.RetryPolicy.Interval),
}
}

if &dagStep.RepeatPolicy != nil {
step.RepeatPolicy = &RepeatPolicy{
Repeat: dagStep.RepeatPolicy.Repeat,
Interval: durationpb.New(dagStep.RepeatPolicy.Interval),
}
}

if dagStep.Preconditions != nil {
conditions := make([]*Condition, len(dagStep.Preconditions))
for i, c := range dagStep.Preconditions {
conditions[i] = &Condition{
Condition: c.Condition,
Expected: c.Expected,
}
}
step.Preconditions = conditions
}

return step, nil
}

func convertPbAnyToInterface(any *anypb.Any) (interface{}, error) {
switch any.TypeUrl {
case "type.googleapis.com/google.protobuf.IntValue":
var intValue wrapperspb.Int32Value
if err := any.UnmarshalTo(&intValue); err != nil {
return nil, fmt.Errorf("could not unmarshal IntValue: %w", err)
}
return intValue.GetValue(), nil
case "type.googleapis.com/google.protobuf.StringValue":
var stringValue wrapperspb.StringValue
if err := any.UnmarshalTo(&stringValue); err != nil {
return nil, fmt.Errorf("could not unmarshal StringValue: %w", err)
}
return stringValue.GetValue(), nil
case "type.googleapis.com/google.protobuf.BoolValue":
var boolValue wrapperspb.BoolValue
if err := any.UnmarshalTo(&boolValue); err != nil {
return nil, fmt.Errorf("could not unmarshal BoolValue: %w", err)
}
return boolValue.GetValue(), nil
case "type.googleapis.com/google.protobuf.Struct":
var structValue structpb.Struct
if err := any.UnmarshalTo(&structValue); err != nil {
return nil, fmt.Errorf("could not unmarshal Struct: %w", err)
}
return structValue.AsMap(), nil
default:
return nil, fmt.Errorf("unknown type URL: %s", any.TypeUrl)
}
}

func convertToProtoMessage(v interface{}) (proto.Message, error) {
switch value := v.(type) {
case string:
return wrapperspb.String(value), nil
case int:
return wrapperspb.Int32(int32(value)), nil
case int32:
return wrapperspb.Int32(value), nil
case bool:
return wrapperspb.Bool(value), nil
case map[string]interface{}:
return structpb.NewStruct(value)
default:
return nil, fmt.Errorf("unsupported type: %T", v)
}
}
Loading

0 comments on commit c09deaf

Please sign in to comment.