Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reporter rate limiter #275

Merged
merged 6 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Event reporter v2.0.3
1. Implemented a fixed window rate limiter for the event reporter to prevent the application from overflowing the entire reporter queue. This enhancement ensures timely reporting without causing delays for other applications.
15 changes: 14 additions & 1 deletion cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"context"
"fmt"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"
"math"
"time"

Expand Down Expand Up @@ -93,6 +94,10 @@ func NewCommand() *cobra.Command {
shardingAlgorithm string
rootpath string
useGrpc bool

rateLimiterEnabled bool
rateLimiterBucketSize int
rateLimiterDuration time.Duration
)
var command = &cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -172,6 +177,11 @@ func NewCommand() *cobra.Command {
BaseURL: codefreshUrl,
AuthToken: codefreshToken,
},
RateLimiterOpts: &reporter.RateLimiterOpts{
Enabled: rateLimiterEnabled,
Rate: rateLimiterDuration,
Capacity: rateLimiterBucketSize,
},
}

log.Infof("Starting event reporter server with grpc transport %v", useGrpc)
Expand Down Expand Up @@ -217,7 +227,10 @@ func NewCommand() *cobra.Command {
command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ")
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces where application resources can be managed in")
command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", true), "Use grpc for interact with argocd server")
command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", false), "Use grpc for interact with argocd server")
command.Flags().BoolVar(&rateLimiterEnabled, "rate-limiter-enabled", env.ParseBoolFromEnv("RATE_LIMITER_ENABLED", false), "Use rate limiter for prevent queue to be overflowed")
command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.")
command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.")
cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) {
redisClient = client
})
Expand Down
4 changes: 2 additions & 2 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type eventReporterController struct {
metricsServer *metrics.MetricsServer
}

func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer)
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer, rateLimiterOpts)
appInformer.AddEventHandler(appBroadcaster)
return &eventReporterController{
appBroadcaster: appBroadcaster,
Expand Down
20 changes: 14 additions & 6 deletions event_reporter/reporter/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
argocommon "github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/event_reporter/metrics"
"github.com/argoproj/argo-cd/v2/event_reporter/sharding"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/env"
"math"
"sync"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/watch"

appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"math"
"sync"
)

type subscriber struct {
Expand Down Expand Up @@ -42,15 +40,17 @@ type broadcasterHandler struct {
filter sharding.ApplicationFilterFunction
featureManager *FeatureManager
metricsServer *metrics.MetricsServer
rateLimiter *RateLimiter
}

func NewBroadcaster(featureManager *FeatureManager, metricsServer *metrics.MetricsServer) Broadcaster {
func NewBroadcaster(featureManager *FeatureManager, metricsServer *metrics.MetricsServer, rateLimiterOpts *RateLimiterOpts) Broadcaster {
// todo: pass real value here
filter := getApplicationFilter("")
return &broadcasterHandler{
filter: filter,
featureManager: featureManager,
metricsServer: metricsServer,
rateLimiter: NewRateLimiter(rateLimiterOpts),
}
}

Expand All @@ -77,6 +77,14 @@ func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) {

for _, s := range subscribers {
if s.matches(event) {

duration, err := b.rateLimiter.Limit(event.Application.Name)
if err != nil {
log.Infof("adding application '%s' to channel failed, due to rate limit, duration left %s", event.Application.Name, duration.String())
b.metricsServer.IncAppEventsCounter(event.Application.Name, false)
continue
}

select {
case s.ch <- event:
{
Expand Down
36 changes: 36 additions & 0 deletions event_reporter/reporter/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package reporter

import (
"context"
"github.com/mennanov/limiters"
"time"
)

type RateLimiterOpts struct {
Enabled bool
Rate time.Duration
Capacity int
}

type RateLimiter struct {
opts *RateLimiterOpts
limiters map[string]*limiters.FixedWindow
}

func NewRateLimiter(opts *RateLimiterOpts) *RateLimiter {
return &RateLimiter{opts: opts, limiters: make(map[string]*limiters.FixedWindow)}
}

func (rl *RateLimiter) Limit(applicationName string) (time.Duration, error) {
if !rl.opts.Enabled {
return time.Duration(0), nil
}

limiter := rl.limiters[applicationName]
if limiter == nil {
limiter = limiters.NewFixedWindow(int64(rl.opts.Capacity), rl.opts.Rate, limiters.NewFixedWindowInMemory(), limiters.NewSystemClock())
rl.limiters[applicationName] = limiter
}

return limiter.Limit(context.Background())
}
46 changes: 46 additions & 0 deletions event_reporter/reporter/rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package reporter

import (
"testing"
"time"
)

func TestRateLimiter(t *testing.T) {
t.Run("Limiter is turned off", func(t *testing.T) {
rl := NewRateLimiter(&RateLimiterOpts{
Enabled: false,
})
d, err := rl.Limit("foo")
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
if d != 0 {
t.Errorf("Expected 0 duration, got %v", d)
}
})
t.Run("Limiter is turned on", func(t *testing.T) {
rl := NewRateLimiter(&RateLimiterOpts{
Enabled: true,
Rate: time.Second,
Capacity: 1,
})
d, err := rl.Limit("foo")
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
if d != 0 {
t.Errorf("Expected 0 duration, got %v", d)
}
})
t.Run("Limiter is turned on but with 0 capacity", func(t *testing.T) {
rl := NewRateLimiter(&RateLimiterOpts{
Enabled: true,
Rate: time.Second,
Capacity: 0,
})
_, err := rl.Limit("foo")
if err == nil {
t.Errorf("Expected error, got nil")
}
})
}
3 changes: 2 additions & 1 deletion event_reporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type EventReporterServerOpts struct {
BaseHRef string
RootPath string
CodefreshConfig *codefresh.CodefreshConfig
RateLimiterOpts *reporter.RateLimiterOpts
}

type handlerSwitcher struct {
Expand Down Expand Up @@ -152,7 +153,7 @@ func (a *EventReporterServer) Init(ctx context.Context) {
}

func (a *EventReporterServer) RunController(ctx context.Context) {
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager)
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts)
go controller.Run(ctx)
}

Expand Down
52 changes: 40 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/ktrysmt/go-bitbucket v0.9.67
github.com/mattn/go-isatty v0.0.19
github.com/mattn/go-zglob v0.0.4
github.com/mennanov/limiters v1.2.3
github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5
github.com/olekukonko/tablewriter v0.0.5
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand Down Expand Up @@ -83,7 +84,7 @@ require (
golang.org/x/sync v0.3.0
golang.org/x/term v0.13.0
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc
google.golang.org/grpc v1.56.2
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.31.0
gopkg.in/square/go-jose.v2 v2.6.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -111,25 +112,52 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.2 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.8 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.8 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.6 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.17 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.17 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.18 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.30 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.24 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.31 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.1 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.24 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.24 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.6 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/go-redsync/redsync/v4 v4.8.1 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/hashicorp/consul/api v1.18.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-hclog v1.4.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.7 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
go.etcd.io/etcd/client/v3 v3.5.7 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
gopkg.in/retry.v1 v1.0.3 // indirect
Expand Down
Loading
Loading