Skip to content

Commit

Permalink
NoDAG prototyping
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk committed Jan 24, 2025
1 parent a36a3f7 commit 867757c
Show file tree
Hide file tree
Showing 9 changed files with 1,025 additions and 183 deletions.
40 changes: 29 additions & 11 deletions pkg/workflows/sdk/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,30 @@ import (

// WorkflowSpecFactory is used to build WorkflowSpecs.
type WorkflowSpecFactory struct {
spec *WorkflowSpec
names map[string]bool
duplicateNames map[string]bool
emptyNames bool
badCapTypes []string
errors []error
fns map[string]func(runtime Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error)
spec *WorkflowSpec
names map[string]bool
duplicateNames map[string]bool
emptyNames bool
badCapTypes []string
errors []error
fns map[string]func(runtime Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error)
runFnsPerTrigger map[string]func(runtime RuntimeV2, triggerEvent capabilities.TriggerEvent) error
}

// One Run() function per registered trigger
func AddRunFunctionForTrigger[TriggerConfig any, TriggerOutputs any](w *WorkflowSpecFactory, triggerRef string, triggerCfg TriggerConfig, fn func(runtime RuntimeV2, triggerOutputs TriggerOutputs) error) {
w.runFnsPerTrigger[triggerRef] = func(runtime RuntimeV2, triggerEvent capabilities.TriggerEvent) error {
var triggerOutputs TriggerOutputs
err := triggerEvent.Outputs.UnwrapTo(&triggerOutputs)
if err != nil {
return err
}
return fn(runtime, triggerOutputs)
}
}

func (w *WorkflowSpecFactory) GetRunFn(triggerRef string) func(runtime RuntimeV2, triggerEvent capabilities.TriggerEvent) error {
return w.runFnsPerTrigger[triggerRef]
}

func (w *WorkflowSpecFactory) GetFn(name string) func(sdk Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
Expand Down Expand Up @@ -136,10 +153,11 @@ func NewWorkflowSpecFactory() *WorkflowSpecFactory {
Consensus: make([]StepDefinition, 0),
Targets: make([]StepDefinition, 0),
},
names: map[string]bool{},
duplicateNames: map[string]bool{},
errors: []error{},
emptyNames: false,
names: map[string]bool{},
duplicateNames: map[string]bool{},
errors: []error{},
emptyNames: false,
runFnsPerTrigger: map[string]func(runtime RuntimeV2, triggerEvent capabilities.TriggerEvent) error{},
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/workflows/sdk/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,13 @@ type FetchResponse struct {
Headers map[string]any `json:"headers,omitempty"` // HTTP headers
Body []byte `json:"body,omitempty"` // HTTP response body
}

type RuntimeV2 interface {
CallCapabilities(calls ...CapabilityCallPromise) error
}

// weakly-typed, for the runtime to fulfill
type CapabilityCallPromise interface {
CallInfo() (ref string, capId string, request capabilities.CapabilityRequest)
Fulfill(response capabilities.CapabilityResponse, err error)
}
55 changes: 55 additions & 0 deletions pkg/workflows/wasm/host/test/nodag/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//go:build wasip1

package main

import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basicaction"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictarget"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictrigger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm"
)

func InitWorkflow(config []byte) *sdk.WorkflowSpecFactory {
workflow := sdk.NewWorkflowSpecFactory()
// add triggers
triggerCfg := basictrigger.TriggerConfig{Name: "trigger", Number: 100}
_ = triggerCfg.New(workflow) // TODO: let AddRunFunctionForTrigger() add this to workflow spec
sdk.AddRunFunctionForTrigger(workflow, "trigger", triggerCfg, RunWorkflow)
return workflow
}

func RunWorkflow(runtime sdk.RuntimeV2, triggerOutputs basictrigger.TriggerOutputs) error {
// two action calls "futures"
actionCall1, _ := wasm.NewCapabilityCall[basicaction.ActionInputs, basicaction.ActionConfig, basicaction.ActionOutputs](
"ref_action1", "[email protected]", basicaction.ActionInputs{}, basicaction.ActionConfig{},
)
actionCall2, _ := wasm.NewCapabilityCall[basicaction.ActionInputs, basicaction.ActionConfig, basicaction.ActionOutputs](
"ref_action2", "[email protected]", basicaction.ActionInputs{}, basicaction.ActionConfig{},
)

// blocking "await" on multiple calls at once
err := runtime.CallCapabilities(actionCall1, actionCall2)
if err != nil {
return err
}

// some compute before calling a target
actionOutputs1, _ := actionCall1.Result()
actionOutputs2, _ := actionCall2.Result()
if len(actionOutputs1.AdaptedThing) <= len(actionOutputs2.AdaptedThing) {
// a single target call
inputStr := "abcd"
targetCall, _ := wasm.NewCapabilityCall[basictarget.TargetInputs, basictarget.TargetConfig, any](
"ref_target1", "[email protected]", basictarget.TargetInputs{CoolInput: &inputStr}, basictarget.TargetConfig{},
)
return runtime.CallCapabilities(targetCall)
}
return nil
}

func main() {
runner := wasm.NewRunnerV2()
workflow := InitWorkflow(runner.Config())
runner.Run(workflow)
}
123 changes: 123 additions & 0 deletions pkg/workflows/wasm/host/wasm_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package host

import (
_ "embed"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

const (
nodagBinaryLocation = "test/nodag/testmodule.wasm"
nodagBinaryCmd = "test/nodag"
)

func Test_V2_Run(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)
mc := &ModuleConfig{
Logger: logger.Test(t),
IsUncompressed: true,
}
capResponsesSoFar := map[string]*pb.CapabilityResponse{}
triggerRef := "trigger"
binary := createTestBinary(nodagBinaryCmd, nodagBinaryLocation, true, t)

// (1) Engine calls GetWorkflowSpec() first.
spec, err := GetWorkflowSpec(ctx, mc, binary, []byte(""))
require.NoError(t, err)

// (2) Engine expects only triggers to be included in the returned spec. Trigger subscriptions are performed.
//
// [trigger]
// |
// (promise)
require.Len(t, spec.Triggers, 1)
require.Equal(t, triggerRef, spec.Triggers[0].Ref)
require.Len(t, spec.Actions, 0)
require.Len(t, spec.Consensus, 0)
require.Len(t, spec.Targets, 0)

m, err := NewModule(mc, binary)
require.NoError(t, err)
m.Start()

// (3) When a TriggerEvent occurs, Engine calls Run() with that Event.
triggerEvent := &pb.TriggerEvent{
TriggerType: "[email protected]",
Outputs: values.ProtoMap(values.EmptyMap()),
}

req := newRunRequest(triggerRef, triggerEvent, capResponsesSoFar)
resp, err := m.Run(ctx, req)
require.NoError(t, err)
runResp := resp.GetRunResponse()
require.NotNil(t, runResp)

// (4) In the first response, the Workflow requests two action capability calls.
//
// [trigger]
// |
// [compute1]
// / \
// [action1] [action2]
// \ /
// (promise)
require.Len(t, runResp.RefToCapCall, 2)
require.Contains(t, runResp.RefToCapCall, "ref_action1")
require.Contains(t, runResp.RefToCapCall, "ref_action2")

// (5) Engine now makes capability calls and when they are ready, it invokes Run() again with both responses.
capResponsesSoFar["ref_action1"] = pb.CapabilityResponseToProto(capabilities.CapabilityResponse{Value: &values.Map{}})
capResponsesSoFar["ref_action2"] = pb.CapabilityResponseToProto(capabilities.CapabilityResponse{Value: &values.Map{}})
req = newRunRequest(triggerRef, triggerEvent, capResponsesSoFar)
resp, err = m.Run(ctx, req)
require.NoError(t, err)
runResp = resp.GetRunResponse()
require.NotNil(t, runResp)

// (6) Workflow now requests a target capability call.
//
// [trigger]
// |
// [compute1]
// / \
// [action1] [action2]
// \ /
// [compute2]
// |
// [target1]
// |
// (promise)
require.Len(t, runResp.RefToCapCall, 1)
require.Contains(t, runResp.RefToCapCall, "ref_target1")

// (7) After calling the target, Engine makes one last Run() call and expects the workflow to complete without errors.
capResponsesSoFar["ref_target1"] = pb.CapabilityResponseToProto(capabilities.CapabilityResponse{Value: &values.Map{}})
req = newRunRequest(triggerRef, triggerEvent, capResponsesSoFar)
resp, err = m.Run(ctx, req)
require.NoError(t, err)
runResp = resp.GetRunResponse()
require.Nil(t, runResp)
}

func newRunRequest(triggerRef string, triggerEvent *pb.TriggerEvent, capResponsesSoFar map[string]*pb.CapabilityResponse) *wasmpb.Request {
return &wasmpb.Request{
Id: uuid.New().String(),
Message: &wasmpb.Request_RunRequest{
RunRequest: &wasmpb.RunRequest{
TriggerRef: triggerRef,
TriggerEvent: triggerEvent,
RefToResponse: capResponsesSoFar,
},
},
}
}
Loading

0 comments on commit 867757c

Please sign in to comment.