-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ add grpc option for cloudevents client. #302
Merged
openshift-merge-bot
merged 6 commits into
open-cluster-management-io:main
from
morvencao:br_cloudevents_grpc_option
Dec 19, 2023
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
1672cbe
add grpc option for cloudevents client.
morvencao 117a360
remove unused files.
morvencao 55c9f2c
enhance cloudevents test.
morvencao 65dafeb
use config for grpc options.
morvencao 3012fe5
add doc for protobuf update.
morvencao 1bc3d90
add grpc options to cloudevents README.
morvencao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package grpc | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
|
||
cloudevents "github.com/cloudevents/sdk-go/v2" | ||
cecontext "github.com/cloudevents/sdk-go/v2/context" | ||
|
||
"open-cluster-management.io/api/cloudevents/generic/options" | ||
"open-cluster-management.io/api/cloudevents/generic/options/grpc/protocol" | ||
"open-cluster-management.io/api/cloudevents/generic/types" | ||
) | ||
|
||
type grpcAgentOptions struct { | ||
GRPCOptions | ||
errorChan chan error // grpc client connection doesn't have error channel, it will handle reconnecting automatically | ||
clusterName string | ||
} | ||
|
||
func NewAgentOptions(grpcOptions *GRPCOptions, clusterName, agentID string) *options.CloudEventsAgentOptions { | ||
return &options.CloudEventsAgentOptions{ | ||
CloudEventsOptions: &grpcAgentOptions{ | ||
GRPCOptions: *grpcOptions, | ||
errorChan: make(chan error), | ||
clusterName: clusterName, | ||
}, | ||
AgentID: agentID, | ||
ClusterName: clusterName, | ||
} | ||
} | ||
|
||
func (o *grpcAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) { | ||
eventType, err := types.ParseCloudEventsType(evtCtx.GetType()) | ||
if err != nil { | ||
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err) | ||
} | ||
|
||
if eventType.Action == types.ResyncRequestAction { | ||
// agent publishes event to spec resync topic to request to get resources spec from all sources | ||
topic := strings.Replace(SpecResyncTopic, "+", o.clusterName, -1) | ||
return cecontext.WithTopic(ctx, topic), nil | ||
} | ||
|
||
// agent publishes event to status topic to send the resource status from a specified cluster | ||
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
statusTopic := strings.Replace(StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1) | ||
statusTopic = strings.Replace(statusTopic, "+", o.clusterName, -1) | ||
return cecontext.WithTopic(ctx, statusTopic), nil | ||
} | ||
|
||
func (o *grpcAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) { | ||
receiver, err := o.GetCloudEventsClient( | ||
ctx, | ||
protocol.WithPublishOption(&protocol.PublishOption{}), | ||
protocol.WithSubscribeOption(&protocol.SubscribeOption{ | ||
Topics: []string{ | ||
replaceNth(SpecTopic, "+", o.clusterName, 2), // receiving the resources spec from sources with spec topic | ||
StatusResyncTopic, // receiving the resources status resync request from sources with status resync topic | ||
}, | ||
}), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return receiver, nil | ||
} | ||
|
||
func (o *grpcAgentOptions) ErrorChan() <-chan error { | ||
return o.errorChan | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package grpc | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
cloudevents "github.com/cloudevents/sdk-go/v2" | ||
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context" | ||
|
||
"open-cluster-management.io/api/cloudevents/generic/types" | ||
) | ||
|
||
var mockEventDataType = types.CloudEventsDataType{ | ||
Group: "resources.test", | ||
Version: "v1", | ||
Resource: "mockresources", | ||
} | ||
|
||
func TestAgentContext(t *testing.T) { | ||
cases := []struct { | ||
name string | ||
event cloudevents.Event | ||
expectedTopic string | ||
assertError func(error) | ||
}{ | ||
{ | ||
name: "unsupported event", | ||
event: func() cloudevents.Event { | ||
evt := cloudevents.NewEvent() | ||
evt.SetType("unsupported") | ||
return evt | ||
}(), | ||
assertError: func(err error) { | ||
if err == nil { | ||
t.Errorf("expected error, but failed") | ||
} | ||
}, | ||
}, | ||
{ | ||
name: "resync specs", | ||
event: func() cloudevents.Event { | ||
eventType := types.CloudEventsType{ | ||
CloudEventsDataType: mockEventDataType, | ||
SubResource: types.SubResourceSpec, | ||
Action: types.ResyncRequestAction, | ||
} | ||
|
||
evt := cloudevents.NewEvent() | ||
evt.SetType(eventType.String()) | ||
evt.SetExtension("clustername", "cluster1") | ||
return evt | ||
}(), | ||
expectedTopic: "sources/clusters/cluster1/specresync", | ||
assertError: func(err error) { | ||
if err != nil { | ||
t.Errorf("unexpected error %v", err) | ||
} | ||
}, | ||
}, | ||
{ | ||
name: "send status no original source", | ||
event: func() cloudevents.Event { | ||
eventType := types.CloudEventsType{ | ||
CloudEventsDataType: mockEventDataType, | ||
SubResource: types.SubResourceStatus, | ||
Action: "test", | ||
} | ||
|
||
evt := cloudevents.NewEvent() | ||
evt.SetSource("hub1") | ||
evt.SetType(eventType.String()) | ||
return evt | ||
}(), | ||
assertError: func(err error) { | ||
if err == nil { | ||
t.Errorf("expected error, but failed") | ||
} | ||
}, | ||
}, | ||
{ | ||
name: "send status", | ||
event: func() cloudevents.Event { | ||
eventType := types.CloudEventsType{ | ||
CloudEventsDataType: mockEventDataType, | ||
SubResource: types.SubResourceStatus, | ||
Action: "test", | ||
} | ||
|
||
evt := cloudevents.NewEvent() | ||
evt.SetSource("agent") | ||
evt.SetType(eventType.String()) | ||
evt.SetExtension("originalsource", "hub1") | ||
return evt | ||
}(), | ||
expectedTopic: "sources/hub1/clusters/cluster1/status", | ||
assertError: func(err error) { | ||
if err != nil { | ||
t.Errorf("unexpected error %v", err) | ||
} | ||
}, | ||
}, | ||
} | ||
|
||
for _, c := range cases { | ||
t.Run(c.name, func(t *testing.T) { | ||
agentOptions := &grpcAgentOptions{clusterName: "cluster1"} | ||
ctx, err := agentOptions.WithContext(context.TODO(), c.event.Context) | ||
c.assertError(err) | ||
|
||
topic := func(ctx context.Context) string { | ||
if ctx == nil { | ||
return "" | ||
} | ||
|
||
return cloudeventscontext.TopicFrom(ctx) | ||
}(ctx) | ||
|
||
if topic != c.expectedTopic { | ||
t.Errorf("expected %s, but got %s", c.expectedTopic, topic) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this for grpc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We require this information to assign the topic for event publish, and the topic will be included in the gRPC publish request, see:
api/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.proto
Lines 68 to 73 in 3012fe5