Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add host services package #393

Draft
wants to merge 1 commit into
base: vnext
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ require (
github.com/nats-io/nats-server/v2 v2.10.20
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/vincent-petithory/dataurl v1.0.0
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/trace v1.30.0
)

require (
ergo.services/meta v0.0.0-20240904054930-a97f6add8a78 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/lipgloss v0.13.0 // indirect
github.com/charmbracelet/x/ansi v0.2.3 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
Expand All @@ -26,7 +31,9 @@ require (
github.com/nats-io/jwt/v2 v2.6.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.6.0 // indirect
)
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ github.com/charmbracelet/x/ansi v0.2.3 h1:VfFN0NUpcjBRd4DnKfRaIRo53KRgey/nhOoEqo
github.com/charmbracelet/x/ansi v0.2.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
Expand Down Expand Up @@ -45,12 +52,22 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vincent-petithory/dataurl v1.0.0 h1:cXw+kPto8NLuJtlMsI152irrVw9fRDX8AbShPRpg2CI=
github.com/vincent-petithory/dataurl v1.0.0/go.mod h1:FHafX5vmDzyP+1CQATJn7WFKc9CvnvxyvZy6I1MrG/U=
go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w=
go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ=
go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc=
go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
318 changes: 318 additions & 0 deletions node/services/builtins/builtins_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
package builtins

import (
"context"
"encoding/json"
"errors"

"github.com/nats-io/nats.go"
hostservices "github.com/synadia-io/nex/node/services"
)

const BucketContextHeader = "x-context-bucket"

type HostServicesHTTPRequest struct {
Method string `json:"method"`
URL string `json:"url"`

Body *string `json:"body,omitempty"`
Headers *json.RawMessage `json:"headers,omitempty"`

// FIXME-- this is very poorly named currently...
//these params are parsed as an object and serialized as part of the query string
Params *json.RawMessage `json:"params,omitempty"`
}

type HostServicesHTTPResponse struct {
Status int `json:"status"`
Headers *json.RawMessage `json:"headers,omitempty"`
Body string `json:"body"`

Error *string `json:"error,omitempty"`
}

type HostServicesKeyValueResponse struct {
Revision int64 `json:"revision,omitempty"`
Success *bool `json:"success,omitempty"`

Errors []string `json:"errors,omitempty"`
}

type HostServicesObjectStoreResponse struct {
Errors []string `json:"errors,omitempty"`
Success bool `json:"success,omitempty"`
}

type HostServicesMessagingRequest struct {
Subject *string `json:"key"`
Payload *json.RawMessage `json:"payload,omitempty"`
}

type HostServicesMessagingResponse struct {
Errors []string `json:"errors,omitempty"`
Success bool `json:"success,omitempty"`
}

type BuiltinServicesClient struct {
hsClient *hostservices.HostServicesClient
}

const (
builtinServiceNameKeyValue = "kv"
builtinServiceNameHttpClient = "http"
builtinServiceNameMessaging = "messaging"
builtinServiceNameObjectStore = "objectstore"
)

func NewBuiltinServicesClient(hsClient *hostservices.HostServicesClient) *BuiltinServicesClient {
return &BuiltinServicesClient{
hsClient: hsClient,
}
}

func (c *BuiltinServicesClient) KVGet(ctx context.Context, bucket string, key string) ([]byte, error) {
metadata := map[string]string{
KeyValueKeyHeader: key,
BucketContextHeader: bucket,
}

resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameKeyValue, kvServiceMethodGet, []byte{}, metadata)
if err != nil {
return nil, err
}

if resp.IsError() {
return nil, resp.Error()
}

return resp.Data, nil
}

func (c *BuiltinServicesClient) KVSet(ctx context.Context, bucket string, key string, value []byte) (*HostServicesKeyValueResponse, error) {
metadata := map[string]string{
KeyValueKeyHeader: key,
BucketContextHeader: bucket,
}

resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameKeyValue, kvServiceMethodSet, value, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}

var kvResponse HostServicesKeyValueResponse
err = json.Unmarshal(resp.Data, &kvResponse)
if err != nil {
return nil, err
}

return &kvResponse, nil
}

func (c *BuiltinServicesClient) KVDelete(ctx context.Context, bucket string, key string) (*HostServicesKeyValueResponse, error) {
metadata := map[string]string{
KeyValueKeyHeader: key,
BucketContextHeader: bucket,
}

resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameKeyValue, kvServiceMethodDelete, []byte{}, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}

var kvResponse HostServicesKeyValueResponse
err = json.Unmarshal(resp.Data, &kvResponse)
if err != nil {
return nil, err
}

return &kvResponse, nil
}

func (c *BuiltinServicesClient) KVKeys(ctx context.Context, bucket string) ([]string, error) {
metadata := map[string]string{
BucketContextHeader: bucket,
}

resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameKeyValue, kvServiceMethodKeys, []byte{}, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}
var results []string
err = json.Unmarshal(resp.Data, &results)
if err != nil {
return nil, err
}

return results, err
}

func (c *BuiltinServicesClient) MessagingPublish(ctx context.Context, subject string, payload []byte) error {
metadata := map[string]string{
MessagingSubjectHeader: subject,
}

resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameMessaging, messagingServiceMethodPublish, payload, metadata)
if err != nil {
return err
}
if resp.IsError() {
return resp.Error()
}

var response HostServicesMessagingResponse
err = json.Unmarshal(resp.Data, &response)
if err != nil {
return err
}

if !response.Success {
es := make([]error, 0)
for _, e := range response.Errors {
es = append(es, errors.New(e))
}
return errors.Join(es...)
}

return nil
}

func (c *BuiltinServicesClient) MessagingRequest(ctx context.Context, subject string, payload []byte) ([]byte, error) {
metadata := map[string]string{
MessagingSubjectHeader: subject,
}
resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameMessaging, messagingServiceMethodRequest, payload, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}

return resp.Data, nil
}

func (c *BuiltinServicesClient) ObjectGet(ctx context.Context, bucket string, objectName string) ([]byte, error) {
metadata := map[string]string{
ObjectStoreObjectNameHeader: objectName,
BucketContextHeader: bucket,
}

resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameObjectStore, objectStoreServiceMethodGet, []byte{}, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}

return resp.Data, nil
}

func (c *BuiltinServicesClient) ObjectList(ctx context.Context, bucket string) ([]*nats.ObjectInfo, error) {
metadata := map[string]string{
BucketContextHeader: bucket,
}

resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameObjectStore, objectStoreServiceMethodList, []byte{}, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}

var theList []*nats.ObjectInfo
err = json.Unmarshal(resp.Data, &theList)
if err != nil {
return nil, err
}

return theList, nil
}

func (c *BuiltinServicesClient) ObjectPut(ctx context.Context, bucket string, objectName string, payload []byte) (*nats.ObjectInfo, error) {
metadata := map[string]string{
ObjectStoreObjectNameHeader: objectName,
BucketContextHeader: bucket,
}
resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameObjectStore, objectStoreServiceMethodPut, payload, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}

var result nats.ObjectInfo
err = json.Unmarshal(resp.Data, &result)
if err != nil {
return nil, err
}
return &result, nil
}

func (c *BuiltinServicesClient) ObjectDelete(ctx context.Context, bucket string, objectName string) error {
metadata := map[string]string{
ObjectStoreObjectNameHeader: objectName,
BucketContextHeader: bucket,
}
resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameObjectStore, objectStoreServiceMethodDelete, []byte{}, metadata)
if err != nil {
return err
}
if resp.IsError() {
return resp.Error()
}

var oResp HostServicesObjectStoreResponse
err = json.Unmarshal(resp.Data, &oResp)
if err != nil {
return err
}
if !oResp.Success {
return mergeErrors(oResp.Errors)
}

return nil
}

func (c *BuiltinServicesClient) SimpleHttpRequest(ctx context.Context, method string, url string, payload []byte) (*HostServicesHTTPResponse, error) {
metadata := map[string]string{
HttpURLHeader: url,
}
resp, err := c.hsClient.PerformRPC(ctx, builtinServiceNameHttpClient, method, payload, metadata)
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, resp.Error()
}
var hResp HostServicesHTTPResponse
err = json.Unmarshal(resp.Data, &hResp)
if err != nil {
return nil, err
}

return &hResp, nil
}

func (c *BuiltinServicesClient) RawClient() *hostservices.HostServicesClient {
return c.hsClient
}

func mergeErrors(input []string) error {
results := make([]error, len(input))
for k, s := range input {
results[k] = errors.New(s)
}
return errors.Join(results...)
}
Loading
Loading