Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wasmbus Policy / Config / Secrets Services #157

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/wasmbus-go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ jobs:
working-directory: x/wasmbus/events
run: go test -cover -v -wash-output

- name: wasmbus/policy
working-directory: x/wasmbus/policy
run: go test -cover -v -wash-output

- name: wasmbus/config
working-directory: x/wasmbus/config
run: go test -cover -v -wash-output

- name: wasmbus/secrets
working-directory: x/wasmbus/secrets
run: go test -cover -v -wash-output

examples:
# Context: https://github.com/golangci/golangci-lint-action/blob/v6.1.1/README.md#annotations
permissions:
Expand Down
4 changes: 4 additions & 0 deletions x/wasmbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const (
PrefixEvents = "wasmbus.evt"
// PrefixControl is the prefix for Lattice RPC.
PrefixCtlV1 = "wasmbus.ctl.v1"

PrefixConfig = "wasmbus.cfg"

PrefixSecrets = "wasmcloud.secrets"
)

var (
Expand Down
39 changes: 39 additions & 0 deletions x/wasmbus/config/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package config

import (
"context"
"fmt"
)

var (
ErrProtocol = fmt.Errorf("encoding error")
ErrInternal = fmt.Errorf("internal error")
)

type API interface {
// Host is currently the only method exposed by the API.
Host(ctx context.Context, req *HostRequest) (*HostResponse, error)
}

var _ API = (*APIMock)(nil)

type APIMock struct {
HostFunc func(ctx context.Context, req *HostRequest) (*HostResponse, error)
}

func (m *APIMock) Host(ctx context.Context, req *HostRequest) (*HostResponse, error) {
return m.HostFunc(ctx, req)
}

type HostRequest struct {
Labels map[string]string `json:"labels"`
}

type HostResponse struct {
RegistryCredentials map[string]RegistryCredential `json:"registryCredentials,omitempty"`
}

type RegistryCredential struct {
Username string `json:"username"`
Password string `json:"password"`
}
27 changes: 27 additions & 0 deletions x/wasmbus/config/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package config

import (
"fmt"

"go.wasmcloud.dev/x/wasmbus"
)

type Server struct {
*wasmbus.Server
Lattice string
api API
}

func NewServer(bus wasmbus.Bus, lattice string, api API) *Server {
return &Server{
Server: wasmbus.NewServer(bus),
Lattice: lattice,
api: api,
}
}

func (s *Server) Serve() error {
subject := fmt.Sprintf("%s.%s.req", wasmbus.PrefixConfig, s.Lattice)
handler := wasmbus.NewRequestHandler(HostRequest{}, HostResponse{}, s.api.Host)
return s.RegisterHandler(subject, handler)
}
68 changes: 68 additions & 0 deletions x/wasmbus/config/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package config

import (
"context"
"fmt"
"testing"
"time"

"github.com/nats-io/nats.go"
"go.wasmcloud.dev/x/wasmbus"
"go.wasmcloud.dev/x/wasmbus/wasmbustest"
)

func TestServer(t *testing.T) {
defer wasmbustest.MustStartNats(t)()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("failed to connect to nats: %v", err)
}
bus := wasmbus.NewNatsBus(nc)
s := NewServer(bus, "test", &APIMock{
HostFunc: func(ctx context.Context, req *HostRequest) (*HostResponse, error) {
return &HostResponse{
RegistryCredentials: map[string]RegistryCredential{
"docker.io": {
Username: "my-username",
Password: "hunter2",
},
},
}, nil
},
})
if err := s.Serve(); err != nil {
t.Fatalf("failed to start server: %v", err)
}

req := wasmbus.NewMessage(fmt.Sprintf("%s.%s.req", wasmbus.PrefixConfig, "test"))
req.Data = []byte(`{"labels":{"hostcore.arch":"aarch64","hostcore.os":"linux","hostcore.osfamily":"unix","kubernetes":"true","kubernetes.hostgroup":"default"}}`)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

rawResp, err := bus.Request(ctx, req)
if err != nil {
t.Fatal(err)
}

var resp HostResponse
if err := wasmbus.Decode(rawResp, &resp); err != nil {
t.Fatal(err)
}

docker, ok := resp.RegistryCredentials["docker.io"]
if !ok {
t.Fatalf("expected docker.io registry credentials")
}
if want, got := "my-username", docker.Username; want != got {
t.Fatalf("expected username %q, got %q", want, got)
}

if want, got := "hunter2", docker.Password; want != got {
t.Fatalf("expected password %q, got %q", want, got)
}

if err := s.Drain(); err != nil {
t.Fatalf("failed to drain server: %v", err)
}
}
3 changes: 2 additions & 1 deletion x/wasmbus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ go 1.23.3
require (
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/goccy/go-yaml v1.15.13
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/nats.go v1.38.0
github.com/nats-io/nkeys v0.4.9
)

require (
Expand All @@ -17,7 +19,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.7.3 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/stretchr/testify v1.10.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions x/wasmbus/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-yaml v1.15.13 h1:Xd87Yddmr2rC1SLLTm2MNDcTjeO/GYo0JGiww6gSTDg=
github.com/goccy/go-yaml v1.15.13/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down
122 changes: 122 additions & 0 deletions x/wasmbus/policy/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package policy

import (
"context"
"fmt"
)

var (
ErrProtocol = fmt.Errorf("encoding error")
ErrInternal = fmt.Errorf("internal error")
)

type API interface {
// PerformInvocation is called when a component is invoked
PerformInvocation(ctx context.Context, req *PerformInvocationRequest) (*Response, error)
// StartComponent is called when a component is started
StartComponent(ctx context.Context, req *StartComponentRequest) (*Response, error)
// StartProvider is called when a provider is started
StartProvider(ctx context.Context, req *StartProviderRequest) (*Response, error)
}

var _ API = (*APIMock)(nil)

type APIMock struct {
PerformInvocationFunc func(ctx context.Context, req *PerformInvocationRequest) (*Response, error)
StartComponentFunc func(ctx context.Context, req *StartComponentRequest) (*Response, error)
StartProviderFunc func(ctx context.Context, req *StartProviderRequest) (*Response, error)
}

func (m *APIMock) PerformInvocation(ctx context.Context, req *PerformInvocationRequest) (*Response, error) {
return m.PerformInvocationFunc(ctx, req)
}

func (m *APIMock) StartComponent(ctx context.Context, req *StartComponentRequest) (*Response, error) {
return m.StartComponentFunc(ctx, req)
}

func (m *APIMock) StartProvider(ctx context.Context, req *StartProviderRequest) (*Response, error) {
return m.StartProviderFunc(ctx, req)
}

// Request is the structure of the request sent to the policy engine
type BaseRequest[T any] struct {
Id string `json:"requestId"`
Kind string `json:"kind"`
Version string `json:"version"`
Host Host `json:"host"`
Request T `json:"request"`
}

// Decision is a helper function to create a response
func (r BaseRequest[T]) Decision(allowed bool, msg string) *Response {
return &Response{
Id: r.Id,
Permitted: allowed,
Message: msg,
}
}

// Deny is a helper function to create a response with a deny decision
func (r BaseRequest[T]) Deny(msg string) *Response {
return r.Decision(false, msg)
}

// Allow is a helper function to create a response with an allow decision
func (r BaseRequest[T]) Allow(msg string) *Response {
return r.Decision(true, msg)
}

// Response is the structure of the response sent by the policy engine
type Response struct {
Id string `json:"requestId"`
Permitted bool `json:"permitted"`
Message string `json:"message,omitempty"`
}

type Claims struct {
PublicKey string `json:"publicKey"`
Issuer string `json:"issuer"`
IssuedAt int `json:"issuedAt"`
ExpiresAt int `json:"expiresAt"`
Expired bool `json:"expired"`
}

type StartComponentPayload struct {
ComponentId string `json:"componentId"`
ImageRef string `json:"imageRef"`
MaxInstances int `json:"maxInstances"`
Annotations map[string]string `json:"annotations"`
}

type StartComponentRequest = BaseRequest[StartComponentPayload]

type StartProviderPayload struct {
ProviderId string `json:"providerId"`
ImageRef string `json:"imageRef"`
Annotations map[string]string `json:"annotations"`
}

type StartProviderRequest = BaseRequest[StartProviderPayload]

type PerformInvocationPayload struct {
Interface string `json:"interface"`
Function string `json:"function"`
// NOTE(lxf): this covers components but not providers. wut?!?
Target InvocationTarget `json:"target"`
Comment on lines +105 to +106
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By covers, do you mean that this isn't present when providers call components? Or that you can't actually use this to call a provider

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldnt find any mention to providers related to Policy, and the InvocationTarget struct doesn't include any provider info.

it's unclear if this is by design or if we missed providers here.

}

type PerformInvocationRequest = BaseRequest[PerformInvocationPayload]

type InvocationTarget struct {
ComponentId string `json:"componentId"`
ImageRef string `json:"imageRef"`
MaxInstances int `json:"maxInstances"`
Annotations map[string]string `json:"annotations"`
}

type Host struct {
PublicKey string `json:"publicKey"`
Lattice string `json:"lattice"`
Labels map[string]string `json:"labels"`
}
59 changes: 59 additions & 0 deletions x/wasmbus/policy/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package policy

import (
"context"
"encoding/json"
"fmt"

"go.wasmcloud.dev/x/wasmbus"
)

type Server struct {
*wasmbus.Server
subject string
api API
}

func NewServer(bus wasmbus.Bus, subject string, api API) *Server {
return &Server{
Server: wasmbus.NewServer(bus),
subject: subject,
api: api,
}
}

func (s *Server) Serve() error {
handler := wasmbus.NewTypedHandler(extractType)

startComponent := wasmbus.NewRequestHandler(StartComponentRequest{}, Response{}, s.api.StartComponent)
if err := handler.RegisterType("startComponent", startComponent); err != nil {
return err
}

startProvider := wasmbus.NewRequestHandler(StartProviderRequest{}, Response{}, s.api.StartProvider)
if err := handler.RegisterType("startProvider", startProvider); err != nil {
return err
}

performInvocation := wasmbus.NewRequestHandler(PerformInvocationRequest{}, Response{}, s.api.PerformInvocation)
if err := handler.RegisterType("performInvocation", performInvocation); err != nil {
return err
}

return s.RegisterHandler(s.subject, handler)
}

func extractType(ctx context.Context, msg *wasmbus.Message) (string, error) {
var baseReq BaseRequest[json.RawMessage]

if err := wasmbus.Decode(msg, &baseReq); err != nil {
return "", err
}

switch baseReq.Kind {
case "startComponent", "startProvider", "performInvocation":
return baseReq.Kind, nil
default:
return "", fmt.Errorf("unknown request kind: %s", baseReq.Kind)
}
}
Loading
Loading