Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NoDAG prototyping #1011

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/workflows/sdk/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,14 @@ type FetchResponse struct {
Headers map[string]any `json:"headers,omitempty"` // HTTP headers
Body []byte `json:"body,omitempty"` // HTTP response body
}

type RuntimeV2 interface {
CallCapability(call CapabilityCallPromise) error
AwaitCapabilities(calls ...CapabilityCallPromise) error
}

// weakly-typed, for the runtime to fulfill
type CapabilityCallPromise interface {
CallInfo() (ref string, capId string, request capabilities.CapabilityRequest)
Fulfill(response capabilities.CapabilityResponse, err error)
}
86 changes: 86 additions & 0 deletions pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type ModuleConfig struct {
// If Determinism is set, the module will override the random_get function in the WASI API with
// the provided seed to ensure deterministic behavior.
Determinism *DeterminismConfig

CallCapAsync func(req *wasmpb.CapabilityCall) error
AwaitCaps func(req *wasmpb.AwaitRequest) (*wasmpb.AwaitResponse, error)
}

type Module struct {
Expand Down Expand Up @@ -272,6 +275,24 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
return nil, fmt.Errorf("error wrapping emit func: %w", err)
}

err = linker.FuncWrap(
"env",
"callcap",
createCallCapFn(logger, modCfg.CallCapAsync),
)
if err != nil {
return nil, fmt.Errorf("error wrapping callcap func: %w", err)
}

err = linker.FuncWrap(
"env",
"awaitcaps",
createAwaitCapsFn(logger, modCfg.AwaitCaps, wasmRead, wasmWrite, wasmWriteUInt32),
)
if err != nil {
return nil, fmt.Errorf("error wrapping awaitcaps func: %w", err)
}

m := &Module{
engine: engine,
module: mod,
Expand Down Expand Up @@ -651,6 +672,71 @@ func createLogFn(logger logger.Logger) func(caller *wasmtime.Caller, ptr int32,
}
}

func createCallCapFn(logger logger.Logger, callCapAsync func(req *wasmpb.CapabilityCall) error) func(caller *wasmtime.Caller, ptr int32, ptrlen int32) int32 {
return func(caller *wasmtime.Caller, ptr int32, ptrlen int32) int32 {
b, innerErr := wasmRead(caller, ptr, ptrlen)
if innerErr != nil {
logger.Errorf("error calling wasmRead: %s", innerErr)
return 1
}

req := &wasmpb.CapabilityCall{}
innerErr = proto.Unmarshal(b, req)
if innerErr != nil {
logger.Errorf("error calling proto unmarshal: %s", innerErr)
return 1
}
err := callCapAsync(req)
if err != nil {
logger.Errorf("error calling callCapAsync: %s", err)
return 1
}
return 0
}
}

func createAwaitCapsFn(
l logger.Logger,
awaitCaps func(req *wasmpb.AwaitRequest) (*wasmpb.AwaitResponse, error),
reader unsafeReaderFunc,
writer unsafeWriterFunc,
sizeWriter unsafeFixedLengthWriterFunc,
) func(caller *wasmtime.Caller, respptr, resplenptr, msgptr, msglen int32) int32 {
return func(caller *wasmtime.Caller, respptr, resplenptr, msgptr, msglen int32) int32 {
// TODO error handling
b, err := reader(caller, msgptr, msglen)
if err != nil {
return 1
}

req := &wasmpb.AwaitRequest{}
err = proto.Unmarshal(b, req)
if err != nil {
return 1
}

resp, err := awaitCaps(req)
if err != nil {
return 1
}

respBytes, err := proto.Marshal(resp)
if err != nil {
return 1
}

if size := writer(caller, respBytes, respptr, int32(len(respBytes))); size == -1 {
return 1
}

if size := sizeWriter(caller, resplenptr, uint32(len(respBytes))); size == -1 {
return 1
}

return ErrnoSuccess
}
}

type unimplementedMessageEmitter struct{}

func (u *unimplementedMessageEmitter) Emit(context.Context, string) error {
Expand Down
47 changes: 47 additions & 0 deletions pkg/workflows/wasm/host/test/nodag/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//go:build wasip1

package main

import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basicaction"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictarget"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictrigger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm"
)

func main() {
runner := wasm.NewRunnerV2()
triggerCfg := basictrigger.TriggerConfig{Number: 100}
_ = wasm.SubscribeToTrigger(runner, "[email protected]", triggerCfg, OnBasicTriggerEvent)
runner.Run()
}

func OnBasicTriggerEvent(runtime sdk.RuntimeV2, triggerOutputs basictrigger.TriggerOutputs) error {
// two async capability calls
actionCall1, _ := wasm.CallCapability[basicaction.ActionInputs, basicaction.ActionConfig, basicaction.ActionOutputs](
runtime, "ref_action1", "[email protected]", basicaction.ActionInputs{}, basicaction.ActionConfig{},
)
actionCall2, _ := wasm.CallCapability[basicaction.ActionInputs, basicaction.ActionConfig, basicaction.ActionOutputs](
runtime, "ref_action2", "[email protected]", basicaction.ActionInputs{}, basicaction.ActionConfig{},
)

// blocking "await" on multiple calls at once
err := runtime.AwaitCapabilities(actionCall1, actionCall2)
if err != nil {
return err
}

// some compute before calling a target
actionOutputs1, _ := actionCall1.Result()
actionOutputs2, _ := actionCall2.Result()
if len(actionOutputs1.AdaptedThing) <= len(actionOutputs2.AdaptedThing) {
// a single target call
inputStr := "abcd"
targetCall, _ := wasm.CallCapability[basictarget.TargetInputs, basictarget.TargetConfig, any](
runtime, "ref_target1", "[email protected]", basictarget.TargetInputs{CoolInput: &inputStr}, basictarget.TargetConfig{},
)
return runtime.AwaitCapabilities(targetCall)
}
return nil
}
137 changes: 137 additions & 0 deletions pkg/workflows/wasm/host/wasm_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package host

import (
_ "embed"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

const (
nodagBinaryLocation = "test/nodag/testmodule.wasm"
nodagBinaryCmd = "test/nodag"
)

func Test_V2_Run(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)
calls := make(chan *wasmpb.CapabilityCall, 10)
awaitReq := make(chan *wasmpb.AwaitRequest, 1)
awaitResp := make(chan *wasmpb.AwaitResponse, 1)
mc := &ModuleConfig{
Logger: logger.Test(t),
IsUncompressed: true,
CallCapAsync: func(req *wasmpb.CapabilityCall) error {
calls <- req
return nil
},
AwaitCaps: func(req *wasmpb.AwaitRequest) (*wasmpb.AwaitResponse, error) {
awaitReq <- req
return <-awaitResp, nil
},
}
capResponses := map[string]*pb.CapabilityResponse{}
triggerID := "[email protected]"
triggerRef := "trigger-0"
binary := createTestBinary(nodagBinaryCmd, nodagBinaryLocation, true, t)

// (1) Engine calls GetWorkflowSpec() first.
spec, err := GetWorkflowSpec(ctx, mc, binary, []byte(""))
require.NoError(t, err)

// (2) Engine expects only triggers to be included in the returned spec. Trigger subscriptions are performed.
//
// [trigger]
// |
// (promise)
require.Len(t, spec.Triggers, 1)
require.Equal(t, triggerID, spec.Triggers[0].ID)
require.Equal(t, triggerRef, spec.Triggers[0].Ref)
require.Len(t, spec.Actions, 0)
require.Len(t, spec.Consensus, 0)
require.Len(t, spec.Targets, 0)

m, err := NewModule(mc, binary)
require.NoError(t, err)
m.Start()

// (3) When a TriggerEvent occurs, Engine calls Run() with that Event.
triggerEvent := &pb.TriggerEvent{
TriggerType: triggerID,
Outputs: values.ProtoMap(values.EmptyMap()),
}

doneCh := make(chan struct{})
go func() {
req := newRunRequest(triggerRef, triggerEvent, capResponses)
_, err := m.Run(ctx, req)
require.NoError(t, err)
close(doneCh)
}()

// (4) The workflow makes two capability calls and then awaits them.
//
// [trigger]
// |
// [compute1]
// / \
// [action1] [action2]
// \ /
// (promise)
call1 := <-calls
require.Equal(t, "ref_action1", call1.Ref)
call2 := <-calls
require.Equal(t, "ref_action2", call2.Ref)

// (5) Engine performs async capability calls.
capResponses["ref_action1"] = pb.CapabilityResponseToProto(capabilities.CapabilityResponse{Value: &values.Map{}})
capResponses["ref_action2"] = pb.CapabilityResponseToProto(capabilities.CapabilityResponse{Value: &values.Map{}})
awaitReqMsg := <-awaitReq
require.Len(t, awaitReqMsg.Refs, 2)
awaitResp <- &wasmpb.AwaitResponse{RefToResponse: capResponses}

// (6) Workflow now makes a target capability call.
//
// [trigger]
// |
// [compute1]
// / \
// [action1] [action2]
// \ /
// [compute2]
// |
// [target1]
// |
// (promise)
call3 := <-calls
require.Equal(t, "ref_target1", call3.Ref)

// (7) Engine performs the call.
capResponses["ref_target1"] = pb.CapabilityResponseToProto(capabilities.CapabilityResponse{Value: &values.Map{}})
awaitReqMsg = <-awaitReq
require.Len(t, awaitReqMsg.Refs, 1)
awaitResp <- &wasmpb.AwaitResponse{RefToResponse: capResponses}

<-doneCh
}

func newRunRequest(triggerRef string, triggerEvent *pb.TriggerEvent, capResponsesSoFar map[string]*pb.CapabilityResponse) *wasmpb.Request {
return &wasmpb.Request{
Id: uuid.New().String(),
Message: &wasmpb.Request_RunRequest{
RunRequest: &wasmpb.RunRequest{
TriggerRef: triggerRef,
TriggerEvent: triggerEvent,
RefToResponse: capResponsesSoFar,
},
},
}
}
Loading
Loading