Skip to content

Commit

Permalink
Merge pull request #451 from forta-protocol/rate-limit-agents
Browse files Browse the repository at this point in the history
Limit the rate of JSON-RPC requests from agents
  • Loading branch information
canercidam authored Mar 7, 2022
2 parents 94bae63 + c3923fd commit 35802d3
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 2 deletions.
8 changes: 7 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ type TraceConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
}

type RateLimitConfig struct {
Rate int `yaml:"rate" json:"rate" validate:"min=1"`
Burst int `yaml:"burst" json:"burst" validate:"min=1"`
}

type JsonRpcProxyConfig struct {
JsonRpc JsonRpcConfig `yaml:"jsonRpc" json:"jsonRpc"`
JsonRpc JsonRpcConfig `yaml:"jsonRpc" json:"jsonRpc"`
RateLimitConfig RateLimitConfig `yaml:"rateLimit" json:"rateLimit" default:"{\"rate\":50,\"burst\":50}"`
}

type LogConfig struct {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
google.golang.org/grpc v1.44.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,9 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
10 changes: 10 additions & 0 deletions services/json-rpc/json_rpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type JsonRpcProxy struct {
agentConfigs []config.AgentConfig
agentConfigMu sync.RWMutex

rateLimiter *RateLimiter

lastErr health.ErrorTracker
}

Expand Down Expand Up @@ -79,6 +81,10 @@ func (p *JsonRpcProxy) metricHandler(h http.Handler) http.Handler {
duration := time.Since(t)
agentConfig, ok := p.findAgentFromRemoteAddr(req.RemoteAddr)
if ok {
if shouldLimitAgent := p.rateLimiter.CheckLimit(agentConfig.ID); shouldLimitAgent {
w.WriteHeader(http.StatusTooManyRequests)
return
}
p.msgClient.PublishProto(messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: metrics.GetJSONRPCMetrics(*agentConfig, t, duration),
})
Expand Down Expand Up @@ -186,5 +192,9 @@ func NewJsonRpcProxy(ctx context.Context, cfg config.Config) (*JsonRpcProxy, err
cfg: jCfg,
dockerClient: globalClient,
msgClient: msgClient,
rateLimiter: NewRateLimiter(
cfg.JsonRpcProxy.RateLimitConfig.Rate,
cfg.JsonRpcProxy.RateLimitConfig.Burst,
),
}, nil
}
68 changes: 68 additions & 0 deletions services/json-rpc/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package json_rpc

import (
"sync"
"time"

log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

// RateLimiter rate limits requests.
type RateLimiter struct {
rate int
burst int
clientLimiters map[string]*clientLimiter
mu sync.Mutex
}

type clientLimiter struct {
lastReservation time.Time
*rate.Limiter
}

// NewRateLimiter creates a new rate limiter.
func NewRateLimiter(rateN, burst int) *RateLimiter {
if rateN <= 0 || burst <= 0 {
log.Panic("non-positive rate limiter arg")
}
rl := &RateLimiter{
rate: rateN,
burst: burst,
clientLimiters: make(map[string]*clientLimiter),
}
go rl.autoCleanup()
return rl
}

// CheckLimit tries adding a request to the limiting channel and returns boolean to signal
// if we hit the rate limit.
func (rl *RateLimiter) CheckLimit(clientID string) bool {
return rl.reserveClient(clientID).Delay() > 0
}

func (rl *RateLimiter) reserveClient(clientID string) *rate.Reservation {
rl.mu.Lock()
defer rl.mu.Unlock()
limiter := rl.clientLimiters[clientID]
if limiter == nil {
limiter = &clientLimiter{Limiter: rate.NewLimiter(rate.Limit(rl.rate), rl.burst)}
rl.clientLimiters[clientID] = limiter
}
limiter.lastReservation = time.Now()
return limiter.Reserve()
}

// deallocate inactive limiters
func (rl *RateLimiter) autoCleanup() {
ticker := time.NewTicker(time.Hour)
for range ticker.C {
rl.mu.Lock()
for clientID, limiter := range rl.clientLimiters {
if time.Since(limiter.lastReservation) > time.Minute*10 {
delete(rl.clientLimiters, clientID)
}
}
rl.mu.Unlock()
}
}
19 changes: 19 additions & 0 deletions services/json-rpc/rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package json_rpc_test

import (
"testing"

json_rpc "github.com/forta-protocol/forta-node/services/json-rpc"
"github.com/stretchr/testify/require"
)

const testClientID = "1"

func TestRateLimiting(t *testing.T) {
r := require.New(t)
rateLimiter := json_rpc.NewRateLimiter(1, 1)
reachedLimit := rateLimiter.CheckLimit(testClientID)
r.False(reachedLimit)
reachedLimit = rateLimiter.CheckLimit(testClientID)
r.True(reachedLimit)
}

0 comments on commit 35802d3

Please sign in to comment.