Skip to content

Commit

Permalink
Implement a gRPC event trigger to publish events to NATS (#377)
Browse files Browse the repository at this point in the history
* Create event-trigger grpc server

* Dockerise event-trigger

* Helm + kustomize setup

* Replace http webhook client with grpc

* Add to upstream service unit tests

* Rework publishing to be via a PublishHandler interface

* Update helm configuration and docker images

* Add nats event server/nats event bus functional test.

* Change service name to run-completion-event-trigger

* Update sensor datakey for use with new run completion format

* Remove unused event source webhook

* Remove grpc config for deployment in kustomize

* Move helm chart for triggers into statusFeedback eventing loop

* Remove triggers from kustomization config

* Update Protobuf to use Enum and client + server implementations

* Apply suggestions from pr

* Drop nats from grpc client name in controller webhook

* Retain data key within body of run-completion when sent to NATS

* Trigger namespace configurable in helm deployment

* Update trigger to use zapr for logging

* Update webhook to support run completion events with no provided RunName

---------

Co-authored-by: Ankitha-Krisshnan <[email protected]>
Co-authored-by: chr12c <[email protected]>
  • Loading branch information
3 people authored Oct 28, 2024
1 parent 8918170 commit 5e41f9d
Show file tree
Hide file tree
Showing 39 changed files with 3,042 additions and 355 deletions.
21 changes: 18 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."

generate-grpc:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
triggers/run-completion-event-trigger/proto/run_completion_event_trigger.proto

fmt: ## Run go fmt against code.
go fmt ./...

Expand Down Expand Up @@ -76,7 +81,10 @@ test-argo:
$(MAKE) -C argo/kfp-compiler test
$(MAKE) -C argo/providers test

test-all: test helm-test-operator helm-test-provider test-argo
test-triggers:
$(MAKE) -C triggers/run-completion-event-trigger test functional-test

test-all: test helm-test-operator helm-test-provider test-argo test-triggers

integration-test-all: integration-test
$(MAKE) -C argo/kfp-compiler integration-test
Expand Down Expand Up @@ -214,7 +222,14 @@ docker-push-argo:
$(MAKE) -C argo/kfp-compiler docker-push
$(MAKE) -C argo/providers docker-push

docker-build-triggers:
$(MAKE) -C triggers/run-completion-event-trigger docker-build

docker-push-triggers:
$(MAKE) -C triggers/run-completion-event-trigger docker-push

##@ Docs

website:
$(MAKE) -C docs-gen

Expand All @@ -223,9 +238,9 @@ docker-push-quickstart:

##@ Package

package-all: docker-build docker-build-argo helm-package website
package-all: docker-build docker-build-argo docker-build-triggers helm-package website

publish-all: docker-push docker-push-argo helm-publish
publish-all: docker-push docker-push-argo docker-push-triggers helm-publish

##@ CI

Expand Down
2 changes: 1 addition & 1 deletion apis/config/v1alpha5/kfp_controller_config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Endpoint struct {
}

func (e Endpoint) URL() string {
return fmt.Sprintf("http://%s:%d%s", e.Host, e.Port, e.Path)
return fmt.Sprintf("%s:%d%s", e.Host, e.Port, e.Path)
}

type ServiceConfiguration struct {
Expand Down
437 changes: 435 additions & 2 deletions argo/providers/go.work.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions argo/providers/vai/eventing_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (es *VaiEventingServer) StartEventSource(source *generic.EventSource, strea

jsonPayload, err := json.Marshal(event)
if err != nil {
es.Logger.Error(err, "failed to marshal event")
es.Logger.Error(err, "failed to marshal event %v", event)
m.Nack()
return
}
Expand All @@ -82,7 +82,7 @@ func (es *VaiEventingServer) StartEventSource(source *generic.EventSource, strea
Name: common.RunCompletionEventName,
Payload: jsonPayload,
}); err != nil {
es.Logger.Error(err, "failed to send event")
es.Logger.Error(err, "failed to send event %v", event)
m.Nack()
return
}
Expand Down
428 changes: 427 additions & 1 deletion argo/status-updater/go.work.sum

Large diffs are not rendered by default.

20 changes: 6 additions & 14 deletions controllers/webhook/runcompletion_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const (
HttpContentTypeJSON = "application/json"
)

type EventData struct {
Header http.Header `json:"header"`
RunCompletionEvent common.RunCompletionEvent `json:"runCompletionEvent"`
}

type RunCompletionFeed struct {
ctx context.Context
eventProcessor EventProcessor
Expand All @@ -35,7 +30,7 @@ func NewRunCompletionFeed(ctx context.Context, client client.Reader, endpoints [
eventProcessor := NewResourceArtifactsEventProcessor(client)

upstreams := pipelines.Map(endpoints, func(endpoint config.Endpoint) UpstreamService {
return NewArgoEventWebhook(endpoint)
return NewGrpcTrigger(ctx, endpoint)
})

return RunCompletionFeed{
Expand Down Expand Up @@ -74,7 +69,7 @@ type DataWrapper struct {
Data common.RunCompletionEventData `json:"data"`
}

func (rcf RunCompletionFeed) extractEventData(request *http.Request) (*EventData, error) {
func (rcf RunCompletionFeed) extractRunCompletionEvent(request *http.Request) (*common.RunCompletionEvent, error) {
body, err := getRequestBody(rcf.ctx, request)
if err != nil {
return nil, err
Expand All @@ -91,10 +86,7 @@ func (rcf RunCompletionFeed) extractEventData(request *http.Request) (*EventData
} else if rce == nil {
return nil, errors.New("event data is empty")
} else {
return &EventData{
Header: request.Header,
RunCompletionEvent: *rce,
}, nil
return rce, nil
}
}

Expand All @@ -107,14 +99,14 @@ func (rcf RunCompletionFeed) handleEvent(response http.ResponseWriter, request *
http.Error(response, fmt.Sprintf("invalid %s, want `%s`", HttpHeaderContentType, HttpContentTypeJSON), http.StatusUnsupportedMediaType)
return
}
eventData, err := rcf.extractEventData(request)
if err != nil || eventData == nil {
event, err := rcf.extractRunCompletionEvent(request)
if err != nil || event == nil {
logger.Error(err, "Failed to extract body from request")
http.Error(response, err.Error(), http.StatusBadRequest)
return
} else {
for _, upstream := range rcf.upstreams {
err := upstream.call(rcf.ctx, *eventData)
err := upstream.call(rcf.ctx, *event)
if err != nil {
logger.Error(err, "Call to upstream failed")
http.Error(response, err.Error(), http.StatusInternalServerError)
Expand Down
5 changes: 3 additions & 2 deletions controllers/webhook/runcompletion_feed_decoupled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sky-uk/kfp-operator/argo/common"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -26,9 +27,9 @@ type MockUpstreamService struct {
expectedBody string
}

func (m MockUpstreamService) call(_ context.Context, ed EventData) error {
func (m MockUpstreamService) call(_ context.Context, event common.RunCompletionEvent) error {
mockUpstreamServiceCallCounter++
passedBodyBytes, err := json.Marshal(ed.RunCompletionEvent)
passedBodyBytes, err := json.Marshal(event)
Expect(err).NotTo(HaveOccurred())
passedBodyStr := string(passedBodyBytes)
if m.expectedBody == "error" {
Expand Down
17 changes: 8 additions & 9 deletions controllers/webhook/runcompletion_feed_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ = Context("getRequestBody", func() {
})
})

var _ = Context("extractEventData", func() {
var _ = Context("extractRunCompletionEvent", func() {
logger, _ := common.NewLogger(zapcore.DebugLevel)
ctx := logr.NewContext(context.Background(), logger)
rcf := RunCompletionFeed{ctx: ctx}
Expand All @@ -57,13 +57,12 @@ var _ = Context("extractEventData", func() {
rcf.eventProcessor = processor

req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://example.com/events", bytes.NewReader([]byte("{\"hello\":\"world\"}")))
req.Header.Add("hello", "world")
Expect(err).NotTo(HaveOccurred())
eventData, err := rcf.extractEventData(req)

event, err := rcf.extractRunCompletionEvent(req)
Expect(err).NotTo(HaveOccurred())

Expect(eventData.Header.Get("hello")).To(Equal("world"))
Expect(eventData.RunCompletionEvent).To(Equal(*processor.returnedRunCompletionEvent))
Expect(event).To(Equal(processor.returnedRunCompletionEvent))
})

It("returns error on event processor error", func() {
Expand All @@ -75,7 +74,7 @@ var _ = Context("extractEventData", func() {

req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://example.com/events", bytes.NewReader([]byte("{\"hello\":\"world\"}")))
Expect(err).NotTo(HaveOccurred())
_, err = rcf.extractEventData(req)
_, err = rcf.extractRunCompletionEvent(req)
Expect(err).To(MatchError("an error occurred"))
})

Expand All @@ -88,7 +87,7 @@ var _ = Context("extractEventData", func() {

req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://example.com/events", bytes.NewReader([]byte("{\"hello\":\"world\"}")))
Expect(err).NotTo(HaveOccurred())
_, err = rcf.extractEventData(req)
_, err = rcf.extractRunCompletionEvent(req)
Expect(err).To(MatchError("event data is empty"))
})
})
Expand All @@ -97,7 +96,7 @@ var _ = Context("extractEventData", func() {
It("returns an error", func() {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://example.com/events", bytes.NewReader([]byte("")))
Expect(err).NotTo(HaveOccurred())
_, err = rcf.extractEventData(req)
_, err = rcf.extractRunCompletionEvent(req)
Expect(err.Error()).To(Equal("request body is empty"))
})
})
Expand All @@ -106,7 +105,7 @@ var _ = Context("extractEventData", func() {
It("returns an error", func() {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://example.com/events", bytes.NewReader([]byte("hello world")))
Expect(err).NotTo(HaveOccurred())
_, err = rcf.extractEventData(req)
_, err = rcf.extractRunCompletionEvent(req)
Expect(err.Error()).To(Equal("invalid character 'h' looking for beginning of value"))
})
})
Expand Down
2 changes: 1 addition & 1 deletion controllers/webhook/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func RandomRunCompletionEventData() common.RunCompletionEventData {
runConfigurationName := common.RandomNamespacedName()

return common.RunCompletionEventData{
Status: common.RunCompletionStatus(common.RandomString()),
Status: common.RunCompletionStatuses.Succeeded,
PipelineName: common.NamespacedName{},
RunConfigurationName: &runConfigurationName,
RunName: &runName,
Expand Down
Loading

0 comments on commit 5e41f9d

Please sign in to comment.