Skip to content

Commit

Permalink
Merge pull request #96 from codefresh-io/CR-update-all-apps
Browse files Browse the repository at this point in the history
Cr update all apps
  • Loading branch information
pasha-codefresh authored May 31, 2022
2 parents b3c88d3 + 3e21055 commit bc6ce95
Showing 3 changed files with 131 additions and 96 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.3.4-cap-CR-11890
2.3.4-cap-CR-report-app-event
12 changes: 1 addition & 11 deletions server/application/application.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math"
"os"
"reflect"
"sort"
"strconv"
@@ -850,15 +849,6 @@ func (s *Server) Watch(q *application.ApplicationQuery, ws application.Applicati
}
}

func shouldProcessNonRootApp() bool {
value := os.Getenv("PROCESS_NON_ROOT_APP")
result, err := strconv.ParseBool(value)
if err != nil {
return false
}
return result
}

func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing_StartEventSourceServer) error {
var (
logCtx log.FieldLogger = log.StandardLogger()
@@ -906,7 +896,7 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
return
}

err := s.applicationEventReporter.streamApplicationEvents(stream.Context(), &a, es, stream, ts, shouldProcessNonRootApp())
err := s.applicationEventReporter.streamApplicationEvents(stream.Context(), &a, es, stream, ts)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
return
213 changes: 129 additions & 84 deletions server/application/application_event_reporter.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ import (
"reflect"
"strings"

"github.com/argoproj/argo-cd/v2/common"

"github.com/argoproj/gitops-engine/pkg/health"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/text"
@@ -18,7 +20,6 @@ import (
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
appv1reg "github.com/argoproj/argo-cd/v2/pkg/apis/application"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
)
@@ -55,27 +56,64 @@ func (s *applicationEventReporter) shouldSendResourceEvent(a *appv1.Application,
return true
}

func isChildApp(a *appv1.Application) bool {
if a.Labels != nil {
return a.Labels[common.LabelKeyAppInstance] != ""
}
return false
}

func getAppAsResource(a *appv1.Application) *appv1.ResourceStatus {
return &appv1.ResourceStatus{
Name: a.Name,
Namespace: a.Namespace,
Version: "v1alpha1",
Kind: "Application",
Group: "argoproj.io",
Status: a.Status.Sync.Status,
Health: &a.Status.Health,
}
}

func (s *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, logCtx *log.Entry) (*apiclient.ManifestResponse, error, bool) {
// get the desired state manifests of the application
desiredManifests, err := s.server.GetManifests(ctx, &application.ApplicationManifestQuery{
Name: &a.Name,
Revision: a.Status.Sync.Revision,
})
if err != nil {
if !strings.Contains(err.Error(), "Manifest generation error") {
return nil, fmt.Errorf("failed to get application desired state manifests: %w", err), false
}
// if it's manifest generation error we need to still report the actual state
// of the resources, but since we can't get the desired state, we will report
// each resource with empty desired state
logCtx.WithError(err).Warn("failed to get application desired state manifests, reporting actual state only")
desiredManifests = &apiclient.ManifestResponse{Manifests: []*apiclient.Manifest{}}
return desiredManifests, nil, true // will ignore requiresPruning=true to not delete resources with actual state
}
return desiredManifests, nil, false
}

func (s *applicationEventReporter) streamApplicationEvents(
ctx context.Context,
a *appv1.Application,
es *events.EventSource,
stream events.Eventing_StartEventSourceServer,
ts string,
processRootApp bool,
) error {
var (
logCtx = log.WithField("application", a.Name)
manifestGenErr bool
logCtx = log.WithField("application", a.Name)
)

logCtx.Info("streaming application events")

isChildApp := false
if a.Labels != nil {
isChildApp = a.Labels["app.kubernetes.io/instance"] != ""
appTree, err := s.server.getAppResources(ctx, a)
if err != nil {
return fmt.Errorf("failed to get application resources tree: %w", err)
}

if !isChildApp || processRootApp {
if !isChildApp(a) {
// application events for child apps would be sent by its parent app
// as resource event
appEvent, err := s.getApplicationEventPayload(ctx, a, es, ts)
@@ -92,106 +130,113 @@ func (s *applicationEventReporter) streamApplicationEvents(
if err := stream.Send(appEvent); err != nil {
return fmt.Errorf("failed to send event for resource %s/%s: %w", a.Namespace, a.Name, err)
}
}
} else {
parentApp := a.Labels[common.LabelKeyAppInstance]

// get the desired state manifests of the application
desiredManifests, err := s.server.GetManifests(ctx, &application.ApplicationManifestQuery{
Name: &a.Name,
Revision: a.Status.Sync.Revision,
})
if err != nil {
if !strings.Contains(err.Error(), "Manifest generation error") {
return fmt.Errorf("failed to get application desired state manifests: %w", err)
parentApplicationEntity, err := s.server.Get(ctx, &application.ApplicationQuery{
Name: &parentApp,
})
if err != nil {
return fmt.Errorf("failed to get application event: %w", err)
}
// if it's manifest generation error we need to still report the actual state
// of the resources, but since we can't get the desired state, we will report
// each resource with empty desired state
logCtx.WithError(err).Warn("failed to get application desired state manifests, reporting actual state only")
desiredManifests = &apiclient.ManifestResponse{Manifests: []*apiclient.Manifest{}}
manifestGenErr = true // will ignore requiresPruning=true to not delete resources with actual state

rs := getAppAsResource(a)

desiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, logCtx)
if err != nil {
return err
}

s.processResource(ctx, *rs, parentApplicationEntity, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr)
}

appTree, err := s.server.getAppResources(ctx, a)
desiredManifests, err, manifestGenErr := s.getDesiredManifests(ctx, a, logCtx)
if err != nil {
return fmt.Errorf("failed to get application resources tree: %w", err)
return err
}

// for each resource in the application get desired and actual state,
// then stream the event
for _, rs := range a.Status.Resources {
logCtx = logCtx.WithFields(log.Fields{
"gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind),
"resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name),
})

if !s.shouldSendResourceEvent(a, rs) {
if isApp(rs) {
continue
}
s.processResource(ctx, rs, a, logCtx, ts, desiredManifests, stream, appTree, es, manifestGenErr)
}

// get resource desired state
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)

// get resource actual state
actualState, err := s.server.GetResource(ctx, &application.ApplicationResourceRequest{
Name: &a.Name,
Namespace: rs.Namespace,
ResourceName: rs.Name,
Version: rs.Version,
Group: rs.Group,
Kind: rs.Kind,
})
if err != nil {
if !strings.Contains(err.Error(), "not found") {
logCtx.WithError(err).Error("failed to get actual state")
continue
}
return nil
}

// empty actual state
actualState = &application.ApplicationResourceResponse{Manifest: ""}
}
func (s *applicationEventReporter) processResource(ctx context.Context, rs appv1.ResourceStatus, a *appv1.Application, logCtx *log.Entry, ts string, desiredManifests *apiclient.ManifestResponse, stream events.Eventing_StartEventSourceServer, appTree *appv1.ApplicationTree, es *events.EventSource, manifestGenErr bool) {
logCtx = logCtx.WithFields(log.Fields{
"gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind),
"resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name),
})

var mr *apiclient.ManifestResponse = desiredManifests
if isApp(rs) {
app := &v1alpha1.Application{}
if err := json.Unmarshal([]byte(actualState.Manifest), app); err != nil {
logWithAppStatus(a, logCtx, ts).WithError(err).Error("failed to unmarshal child application resource")
}
resourceDesiredManifests, err := s.server.GetManifests(ctx, &application.ApplicationManifestQuery{
Name: &rs.Name,
Revision: app.Status.Sync.Revision,
})
if err != nil {
logWithAppStatus(a, logCtx, ts).WithError(err).Error("failed to get resource desired manifest")
} else {
mr = resourceDesiredManifests
}
}
if !s.shouldSendResourceEvent(a, rs) {
return
}

ev, err := getResourceEventPayload(a, &rs, es, actualState, desiredState, mr, appTree, manifestGenErr, ts)
if err != nil {
logCtx.WithError(err).Error("failed to get event payload")
continue
// get resource desired state
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)

// get resource actual state
actualState, err := s.server.GetResource(ctx, &application.ApplicationResourceRequest{
Name: &a.Name,
Namespace: rs.Namespace,
ResourceName: rs.Name,
Version: rs.Version,
Group: rs.Group,
Kind: rs.Kind,
})
if err != nil {
if !strings.Contains(err.Error(), "not found") {
logCtx.WithError(err).Error("failed to get actual state")
return
}

appRes := appv1.Application{}
if isApp(rs) && actualState.Manifest != "" && json.Unmarshal([]byte(actualState.Manifest), &appRes) == nil {
logWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event")
// empty actual state
actualState = &application.ApplicationResourceResponse{Manifest: ""}
}

var mr *apiclient.ManifestResponse = desiredManifests
if isApp(rs) {
app := &appv1.Application{}
if err := json.Unmarshal([]byte(actualState.Manifest), app); err != nil {
logWithAppStatus(a, logCtx, ts).WithError(err).Error("failed to unmarshal child application resource")
}
resourceDesiredManifests, err := s.server.GetManifests(ctx, &application.ApplicationManifestQuery{
Name: &rs.Name,
Revision: app.Status.Sync.Revision,
})
if err != nil {
logWithAppStatus(a, logCtx, ts).WithError(err).Error("failed to get resource desired manifest")
} else {
logCtx.Info("streaming resource event")
mr = resourceDesiredManifests
}
}

if err := stream.Send(ev); err != nil {
logCtx.WithError(err).Error("failed to send even")
continue
}
ev, err := getResourceEventPayload(a, &rs, es, actualState, desiredState, mr, appTree, manifestGenErr, ts)
if err != nil {
logCtx.WithError(err).Error("failed to get event payload")
return
}

if err := s.server.cache.SetLastResourceEvent(a, rs, resourceEventCacheExpiration); err != nil {
logCtx.WithError(err).Error("failed to cache resource event")
continue
}
appRes := appv1.Application{}
if isApp(rs) && actualState.Manifest != "" && json.Unmarshal([]byte(actualState.Manifest), &appRes) == nil {
logWithAppStatus(&appRes, logCtx, ts).Info("streaming resource event")
} else {
logCtx.Info("streaming resource event")
}

return nil
if err := stream.Send(ev); err != nil {
logCtx.WithError(err).Error("failed to send even")
return
}

if err := s.server.cache.SetLastResourceEvent(a, rs, resourceEventCacheExpiration); err != nil {
logCtx.WithError(err).Error("failed to cache resource event")
}
}

func (s *applicationEventReporter) shouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) bool {

0 comments on commit bc6ce95

Please sign in to comment.