Skip to content

Commit

Permalink
perf: add cluster workflow template informer
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Buczak <[email protected]>
  • Loading branch information
jakkubu committed Oct 8, 2024
1 parent 3d90e33 commit 9dac1ce
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 7 deletions.
9 changes: 7 additions & 2 deletions pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type argoKubeClient struct {
instanceIDService instanceid.Service
wfClient workflow.Interface
wfInformer types.WorkflowTemplateStore
cwfInformer types.ClusterWorkflowTemplateStore
}

var _ Client = &argoKubeClient{}
Expand Down Expand Up @@ -68,6 +69,10 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
cwftmplInformer, err := clusterworkflowtmplserver.NewInformer(restConfig)
if err != nil {
return nil, nil, err
}
eventSourceInterface, err := eventsource.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -95,13 +100,13 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
return ctx, &argoKubeClient{instanceIDService, wfClient, wftmplInformer}, nil
return ctx, &argoKubeClient{instanceIDService, wfClient, wftmplInformer, cwftmplInformer}, nil
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfLister := store.NewKubeLister(a.wfClient)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, a.wfInformer, nil)}}
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, a.wfInformer, a.cwfInformer, nil)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
Expand Down
6 changes: 5 additions & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
cwftmplInformer, err := clusterworkflowtemplate.NewInformer(as.restConfig)
if err != nil {
log.Fatal(err)
}
eventRecorderManager := events.NewEventRecorderManager(as.clients.Kubernetes)
artifactRepositories := artifactrepositories.New(as.clients.Kubernetes, as.managedNamespace, &config.ArtifactRepository)
artifactServer := artifacts.NewArtifactServer(as.gatekeeper, hydrator.New(offloadRepo), wfArchive, instanceIDService, artifactRepositories)
Expand All @@ -243,7 +247,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, wftmplInformer, &resourceCacheNamespace)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, wftmplInformer, cwftmplInformer, &resourceCacheNamespace)
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wftmplInformer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

Expand Down
72 changes: 72 additions & 0 deletions server/clusterworkflowtemplate/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package clusterworkflowtemplate

import (
"time"

log "github.com/sirupsen/logrus"

"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
clientv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/listers/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/types"
"github.com/argoproj/argo-workflows/v3/workflow/controller/informer"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

const (
workflowTemplateResyncPeriod = 20 * time.Minute
)

var _ types.ClusterWorkflowTemplateStore = &Informer{}

type Informer struct {
informer wfextvv1alpha1.ClusterWorkflowTemplateInformer
}

func NewInformer(restConfig *rest.Config) (*Informer, error) {
dynamicInterface, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, err
}
informer := informer.NewTolerantClusterWorkflowTemplateInformer(
dynamicInterface,
workflowTemplateResyncPeriod,
)
return &Informer{
informer: informer,
}, nil
}

// Start informer in separate go-routine and block until cache sync
func (cwti *Informer) Run(stopCh <-chan struct{}) {

cwti.informer.Informer()

go cwti.informer.Informer().Run(stopCh)

if !cache.WaitForCacheSync(
stopCh,
cwti.informer.Informer().HasSynced,
) {
log.Fatal("Timed out waiting for caches to sync")
}
}

// if namespace contains empty string Lister will use the namespace provided during initialization
func (cwti *Informer) Lister(namespace string) clientv1alpha1.ClusterWorkflowTemplateLister {
if cwti.informer == nil {
log.Fatal("Template informer not started")
}
return cwti.informer.Lister()
}

// if namespace contains empty string Lister will use the namespace provided during initialization
func (cwti *Informer) Getter() templateresolution.ClusterWorkflowTemplateGetter {
if cwti.informer == nil {
log.Fatal("Template informer not started")
}
return cwti.informer.Lister()
}
3 changes: 3 additions & 0 deletions server/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ import "github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
type WorkflowTemplateStore interface {
Getter(namespace string) templateresolution.WorkflowTemplateNamespacedGetter
}
type ClusterWorkflowTemplateStore interface {
Getter() templateresolution.ClusterWorkflowTemplateGetter
}
17 changes: 13 additions & 4 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,21 @@ type workflowServer struct {
wfLister store.WorkflowLister
wfReflector *cache.Reflector
wftmplStore servertypes.WorkflowTemplateStore
cwftmplStore servertypes.ClusterWorkflowTemplateStore
}

var _ workflowpkg.WorkflowServiceServer = &workflowServer{}

// NewWorkflowServer returns a new WorkflowServer
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore, wftmplStore servertypes.WorkflowTemplateStore, namespace *string) *workflowServer {
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore, wftmplStore servertypes.WorkflowTemplateStore, cwftmplStore servertypes.ClusterWorkflowTemplateStore, namespace *string) *workflowServer {
ws := &workflowServer{
instanceIDService: instanceIDService,
offloadNodeStatusRepo: offloadNodeStatusRepo,
hydrator: hydrator.New(offloadNodeStatusRepo),
wfArchive: wfArchive,
wfLister: wfLister,
wftmplStore: wftmplStore,
cwftmplStore: cwftmplStore,
}
if wfStore != nil && namespace != nil {
lw := &cache.ListWatch{
Expand All @@ -93,6 +95,13 @@ func (s *workflowServer) wftmplGetter(wfClient versioned.Interface, namespace st
return templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(namespace))
}

func (s *workflowServer) cwftmplGetter(wfClient versioned.Interface) templateresolution.ClusterWorkflowTemplateGetter {
if s.wftmplStore != nil {
return s.cwftmplStore.Getter()
}
return templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
}

func (s *workflowServer) Run(stopCh <-chan struct{}) {
if s.wfReflector != nil {
s.wfReflector.Run(stopCh)
Expand All @@ -114,7 +123,7 @@ func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.Wo
creator.Label(ctx, req.Workflow)

wftmplGetter := s.wftmplGetter(wfClient, req.Workflow.Namespace)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := s.cwftmplGetter(wfClient)

err := validate.ValidateWorkflow(wftmplGetter, cwftmplGetter, req.Workflow, validate.ValidateOpts{})
if err != nil {
Expand Down Expand Up @@ -671,7 +680,7 @@ func (s *workflowServer) LintWorkflow(ctx context.Context, req *workflowpkg.Work
}
wfClient := auth.GetWfClient(ctx)
wftmplGetter := s.wftmplGetter(wfClient, req.Workflow.Namespace)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := s.cwftmplGetter(wfClient)
s.instanceIDService.Label(req.Workflow)
creator.Label(ctx, req.Workflow)

Expand Down Expand Up @@ -785,7 +794,7 @@ func (s *workflowServer) SubmitWorkflow(ctx context.Context, req *workflowpkg.Wo
}

wftmplGetter := s.wftmplGetter(wfClient, req.Namespace)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := s.cwftmplGetter(wfClient)

err = validate.ValidateWorkflow(wftmplGetter, cwftmplGetter, wf, validate.ValidateOpts{Submit: true})
if err != nil {
Expand Down

0 comments on commit 9dac1ce

Please sign in to comment.