Skip to content

Commit

Permalink
add grpc option for cloudevents client.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 13, 2023
1 parent bab1208 commit e71019b
Show file tree
Hide file tree
Showing 185 changed files with 42,603 additions and 65 deletions.
7 changes: 7 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": []
}
76 changes: 76 additions & 0 deletions cloudevents/generic/options/grpc/agentoptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package grpc

import (
"context"
"fmt"
"strings"

cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"

"open-cluster-management.io/api/cloudevents/generic/options"
"open-cluster-management.io/api/cloudevents/generic/options/grpc/protocol"
"open-cluster-management.io/api/cloudevents/generic/types"
)

type grpcAgentOptions struct {
GRPCOptions
errorChan chan error // grpc client connection doesn't have error channel, it will handle reconnecting automatically
clusterName string
}

func NewAgentOptions(grpcOptions *GRPCOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
return &options.CloudEventsAgentOptions{
CloudEventsOptions: &grpcAgentOptions{
GRPCOptions: *grpcOptions,
errorChan: make(chan error),
clusterName: clusterName,
},
AgentID: agentID,
ClusterName: clusterName,
}
}

func (o *grpcAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
}

if eventType.Action == types.ResyncRequestAction {
// agent publishes event to spec resync topic to request to get resources spec from all sources
topic := strings.Replace(SpecResyncTopic, "+", o.clusterName, -1)
return cecontext.WithTopic(ctx, topic), nil
}

// agent publishes event to status topic to send the resource status from a specified cluster
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
if err != nil {
return nil, err
}

statusTopic := strings.Replace(StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1)
statusTopic = strings.Replace(statusTopic, "+", o.clusterName, -1)
return cecontext.WithTopic(ctx, statusTopic), nil
}

func (o *grpcAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) {
receiver, err := o.GetCloudEventsClient(
ctx,
protocol.WithPublishOption(&protocol.PublishOption{}),
protocol.WithSubscribeOption(&protocol.SubscribeOption{
Topics: []string{
replaceNth(SpecTopic, "+", o.clusterName, 2), // receiving the resources spec from sources with spec topic
StatusResyncTopic, // receiving the resources status resync request from sources with status resync topic
},
}),
)
if err != nil {
return nil, err
}
return receiver, nil
}

func (o *grpcAgentOptions) ErrorChan() <-chan error {
return o.errorChan
}
123 changes: 123 additions & 0 deletions cloudevents/generic/options/grpc/agentoptions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package grpc

import (
"context"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"

"open-cluster-management.io/api/cloudevents/generic/types"
)

var mockEventDataType = types.CloudEventsDataType{
Group: "resources.test",
Version: "v1",
Resource: "mockresources",
}

func TestAgentContext(t *testing.T) {
cases := []struct {
name string
event cloudevents.Event
expectedTopic string
assertError func(error)
}{
{
name: "unsupported event",
event: func() cloudevents.Event {
evt := cloudevents.NewEvent()
evt.SetType("unsupported")
return evt
}(),
assertError: func(err error) {
if err == nil {
t.Errorf("expected error, but failed")
}
},
},
{
name: "resync specs",
event: func() cloudevents.Event {
eventType := types.CloudEventsType{
CloudEventsDataType: mockEventDataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}

evt := cloudevents.NewEvent()
evt.SetType(eventType.String())
evt.SetExtension("clustername", "cluster1")
return evt
}(),
expectedTopic: "sources/clusters/cluster1/specresync",
assertError: func(err error) {
if err != nil {
t.Errorf("unexpected error %v", err)
}
},
},
{
name: "send status no original source",
event: func() cloudevents.Event {
eventType := types.CloudEventsType{
CloudEventsDataType: mockEventDataType,
SubResource: types.SubResourceStatus,
Action: "test",
}

evt := cloudevents.NewEvent()
evt.SetSource("hub1")
evt.SetType(eventType.String())
return evt
}(),
assertError: func(err error) {
if err == nil {
t.Errorf("expected error, but failed")
}
},
},
{
name: "send status",
event: func() cloudevents.Event {
eventType := types.CloudEventsType{
CloudEventsDataType: mockEventDataType,
SubResource: types.SubResourceStatus,
Action: "test",
}

evt := cloudevents.NewEvent()
evt.SetSource("agent")
evt.SetType(eventType.String())
evt.SetExtension("originalsource", "hub1")
return evt
}(),
expectedTopic: "sources/hub1/clusters/cluster1/status",
assertError: func(err error) {
if err != nil {
t.Errorf("unexpected error %v", err)
}
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agentOptions := &grpcAgentOptions{clusterName: "cluster1"}
ctx, err := agentOptions.WithContext(context.TODO(), c.event.Context)
c.assertError(err)

topic := func(ctx context.Context) string {
if ctx == nil {
return ""
}

return cloudeventscontext.TopicFrom(ctx)
}(ctx)

if topic != c.expectedTopic {
t.Errorf("expected %s, but got %s", c.expectedTopic, topic)
}
})
}
}
130 changes: 130 additions & 0 deletions cloudevents/generic/options/grpc/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package grpc

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"strings"

"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

cloudevents "github.com/cloudevents/sdk-go/v2"

"open-cluster-management.io/api/cloudevents/generic/options/grpc/protocol"
)

const (
// SpecTopic is a pubsub topic for resource spec.
SpecTopic = "sources/+/clusters/+/spec"

// StatusTopic is a pubsub topic for resource status.
StatusTopic = "sources/+/clusters/+/status"

// SpecResyncTopic is a pubsub topic for resource spec resync.
SpecResyncTopic = "sources/clusters/+/specresync"

// StatusResyncTopic is a pubsub topic for resource status resync.
StatusResyncTopic = "sources/+/clusters/statusresync"
)

type GRPCOptions struct {
Host string
Port int
CAFile string
ClientCertFile string
ClientKeyFile string
}

func NewGRPCOptions() *GRPCOptions {
return &GRPCOptions{}
}

func (o *GRPCOptions) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.Host, "grpc-host", o.Host, "The host of grpc server")
flags.IntVar(&o.Port, "grpc-port", o.Port, "The port of grpc server")
flags.StringVar(&o.CAFile, "server-ca", o.CAFile, "A file containing trusted CA certificates for server")
flags.StringVar(&o.ClientCertFile, "client-certificate", o.ClientCertFile, "The grpc client certificate file")
flags.StringVar(&o.ClientKeyFile, "client-key", o.ClientKeyFile, "The grpc client private key file")
}

func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
if len(o.CAFile) != 0 {
certPool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}

caPEM, err := os.ReadFile(o.CAFile)
if err != nil {
return nil, err
}

if ok := certPool.AppendCertsFromPEM(caPEM); !ok {
return nil, fmt.Errorf("invalid CA %s", o.CAFile)
}

clientCerts, err := tls.LoadX509KeyPair(o.ClientCertFile, o.ClientKeyFile)
if err != nil {
return nil, err
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{clientCerts},
RootCAs: certPool,
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}

conn, err := grpc.Dial(fmt.Sprintf("%s:%d", o.Host, o.Port), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s:%d, %v", o.Host, o.Port, err)
}

return conn, nil
}

conn, err := grpc.Dial(fmt.Sprintf("%s:%d", o.Host, o.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s:%d, %v", o.Host, o.Port, err)
}

return conn, nil
}

func (o *GRPCOptions) GetCloudEventsClient(ctx context.Context, clientOpts ...protocol.Option) (cloudevents.Client, error) {
conn, err := o.GetGRPCClientConn()
if err != nil {
return nil, err
}

opts := []protocol.Option{}
opts = append(opts, clientOpts...)
p, err := protocol.NewProtocol(conn, opts...)
if err != nil {
return nil, err
}

return cloudevents.NewClient(p)
}

// Replace the nth occurrence of old in str by new.
func replaceNth(str, old, new string, n int) string {
i := 0
for m := 1; m <= n; m++ {
x := strings.Index(str[i:], old)
if x < 0 {
break
}
i += x
if m == n {
return str[:i] + new + str[i+len(old):]
}
i += len(old)
}
return str
}
50 changes: 50 additions & 0 deletions cloudevents/generic/options/grpc/protobuf/datacodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package protobuf

import (
"context"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/cloudevents/sdk-go/v2/event/datacodec"
)

const (
// ContentTypeProtobuf indicates that the data attribute is a protobuf
// message.
ContentTypeProtobuf = "application/protobuf"
)

func init() {
datacodec.AddDecoder(ContentTypeProtobuf, DecodeData)
datacodec.AddEncoder(ContentTypeProtobuf, EncodeData)
}

// DecodeData converts an encoded protobuf message back into the message (out).
// The message must be a type compatible with whatever was given to EncodeData.
func DecodeData(ctx context.Context, in []byte, out interface{}) error {
outmsg, ok := out.(proto.Message)
if !ok {
return fmt.Errorf("can only decode protobuf into proto.Message. got %T", out)
}
if err := proto.Unmarshal(in, outmsg); err != nil {
return fmt.Errorf("failed to unmarshal message: %s", err)
}
return nil
}

// EncodeData a protobuf message to bytes.
//
// Like the official datacodec implementations, this one returns the given value
// as-is if it is already a byte slice.
func EncodeData(ctx context.Context, in interface{}) ([]byte, error) {
if b, ok := in.([]byte); ok {
return b, nil
}
var pbmsg proto.Message
var ok bool
if pbmsg, ok = in.(proto.Message); !ok {
return nil, fmt.Errorf("protobuf encoding only works with protobuf messages. got %T", in)
}
return proto.Marshal(pbmsg)
}
Loading

0 comments on commit e71019b

Please sign in to comment.