Skip to content

Commit

Permalink
Add per-IP rate limiters
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Nov 17, 2024
1 parent 75fdbaa commit 2aacb58
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
19 changes: 11 additions & 8 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ import (
"github.com/labstack/echo/v4"
"go.opentelemetry.io/otel"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
)

type Server struct {
Subscribers map[int64]*Subscriber
lk sync.RWMutex
nextSub int64
Consumer *consumer.Consumer
maxSubRate float64
seq int64
Subscribers map[int64]*Subscriber
lk sync.RWMutex
nextSub int64
Consumer *consumer.Consumer
maxSubRate float64
seq int64
perIPLimiters map[string]*rate.Limiter
}

var upgrader = websocket.Upgrader{
Expand All @@ -40,8 +42,9 @@ var tracer = otel.Tracer("jetstream-server")

func NewServer(maxSubRate float64) (*Server, error) {
s := Server{
Subscribers: make(map[int64]*Subscriber),
maxSubRate: maxSubRate,
Subscribers: make(map[int64]*Subscriber),
maxSubRate: maxSubRate,
perIPLimiters: make(map[string]*rate.Limiter),
}

return &s, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/server/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
s.lk.Lock()
defer s.lk.Unlock()

lim := s.perIPLimiters[realIP]
if lim == nil {
lim = rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate))
s.perIPLimiters[realIP] = lim
}

sub := Subscriber{
ws: ws,
realIP: realIP,
Expand All @@ -207,7 +213,7 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
compress: opts.Compress,
deliveredCounter: eventsDelivered.WithLabelValues(realIP),
bytesCounter: bytesDelivered.WithLabelValues(realIP),
rl: rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)),
rl: lim,
}

s.Subscribers[s.nextSub] = &sub
Expand Down

0 comments on commit 2aacb58

Please sign in to comment.