Skip to content

Commit

Permalink
Pool handler maps (#2)
Browse files Browse the repository at this point in the history
The previous behavior was to always keep 2 maps, one being written to by
each call to `Notify` while the other gets drained by `send`. On paper,
this approach seems fine since it stands to reason that at steady state,
the client will receive a similar number of resources per response,
meaning the two maps will be roughly equally sized and shouldn't require
any additional allocations. However, during startup, the client will
likely resubmit many subscriptions at once and the initial response will
likely be quite large. Because these maps are never reclaimed, they now
simply serve as bloat, with a significant memory footprint (see attached
heapdump). This is made worse by the fact that these large maps are
severely underused when the connection becomes idle (except for the
occasional update) after the initial resource dump.


![image](https://github.com/user-attachments/assets/66e19bce-9239-4321-8ce1-4f4d0a6cee43)

This new approach makes all handlers share maps from a pool, limiting
the overall allocation rate, but also reducing the footprint of each
idle connection by only acquiring a map from the pool on the first
notification. This means clients that do not receive frequent updates
will not hog any memory at all.

This seems to have neglible performance impact under stress:
```
% benchstat old new
goos: darwin
goarch: arm64
pkg: github.com/linkedin/diderot/internal/server
cpu: Apple M1 Pro
                      │     old      │                 new                 │
                      │    sec/op    │   sec/op     vs base                │
Handlers/____1_subs-8   125.8n ± 12%   125.2n ± 9%        ~ (p=0.670 n=10)
Handlers/___10_subs-8   1.965µ ±  3%   1.921µ ± 3%   -2.21% (p=0.030 n=10)
Handlers/__100_subs-8   20.72µ ±  5%   20.74µ ± 4%        ~ (p=0.869 n=10)
Handlers/_1000_subs-8   221.4µ ± 15%   243.8µ ± 8%  +10.10% (p=0.029 n=10)
Handlers/10000_subs-8   2.491m ±  5%   2.591m ± 4%        ~ (p=0.165 n=10)
geomean                 19.51µ         19.94µ        +2.22%

                      │     old      │                  new                  │
                      │     B/op     │     B/op      vs base                 │
Handlers/____1_subs-8     128.0 ± 0%     128.0 ± 0%       ~ (p=1.000 n=10) ¹
Handlers/___10_subs-8   1.250Ki ± 0%   1.254Ki ± 0%  +0.31% (p=0.000 n=10)
Handlers/__100_subs-8   12.50Ki ± 0%   12.70Ki ± 0%  +1.57% (p=0.000 n=10)
Handlers/_1000_subs-8   125.4Ki ± 0%   132.5Ki ± 1%  +5.67% (p=0.000 n=10)
Handlers/10000_subs-8   1.258Mi ± 0%   1.334Mi ± 2%  +5.98% (p=0.000 n=10)
geomean                 12.58Ki        12.92Ki       +2.67%
¹ all samples are equal

                      │     old     │                 new                  │
                      │  allocs/op  │  allocs/op   vs base                 │
Handlers/____1_subs-8    1.000 ± 0%    1.000 ± 0%       ~ (p=1.000 n=10) ¹
Handlers/___10_subs-8    10.00 ± 0%    10.00 ± 0%       ~ (p=1.000 n=10) ¹
Handlers/__100_subs-8    100.0 ± 0%    100.0 ± 0%       ~ (p=1.000 n=10) ¹
Handlers/_1000_subs-8   1.002k ± 0%   1.003k ± 0%  +0.10% (p=0.005 n=10)
Handlers/10000_subs-8   10.23k ± 0%   10.24k ± 0%       ~ (p=0.402 n=10)
geomean                  100.5         100.5       +0.05%
¹ all samples are equal
```
  • Loading branch information
PapaCharlie authored Sep 18, 2024
1 parent 24afde2 commit d1254dc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PACKAGE = github.com/linkedin/diderot
SOURCE_FILES = $(wildcard $(shell git ls-files))
PROFILES = out
PROFILES = $(PWD)/out
COVERAGE = $(PROFILES)/diderot.cov
GOBIN = $(shell go env GOPATH)/bin

Expand Down Expand Up @@ -47,7 +47,7 @@ profile_cache:
$(MAKE) -B $(PROFILES)/BenchmarkCacheThroughput.bench BENCH_PKG=./cache

profile_handlers:
$(MAKE) -B $(PROFILES)/BenchmarkHandlers.bench BENCH_PKG=./server
$(MAKE) -B $(PROFILES)/BenchmarkHandlers.bench BENCH_PKG=./internal/server

BENCHCOUNT = 1
BENCHTIME = 1s
Expand Down
28 changes: 17 additions & 11 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func newHandler(
ctx: ctx,
ignoreDeletes: ignoreDeletes,
send: send,
entries: map[string]entry{},
immediateNotificationReceived: newNotifyOnceChan(),
notificationReceived: newNotifyOnceChan(),
}
Expand Down Expand Up @@ -85,6 +84,10 @@ func (ch notifyOnceChan) reset() {
}
}

var entryMapPool = sync.Pool{New: func() any {
return make(map[string]entry)
}}

// handler implements the BatchSubscriptionHandler interface using a backing map to aggregate updates
// as they come in, and flushing them out, according to when the limiter permits it.
type handler struct {
Expand All @@ -106,7 +109,7 @@ type handler struct {
// then checks whether immediateNotificationReceived has been signaled, and if so skips the granular
// rate limiter. Otherwise, it either waits for the granular rate limit to clear, or
// immediateNotificationReceived to be signaled, whichever comes first. Only then does it invoke
// swapResourceMaps which resets notificationReceived, immediateNotificationReceived and entries to a
// swapEntries which resets notificationReceived, immediateNotificationReceived and entries to a
// state where they can receive more notifications while, in the background, it invokes send with all
// accumulated entries up to this point. Once send completes, it returns to waiting on
// notificationReceived. All operations involving these channels will exit early if ctx is cancelled,
Expand All @@ -119,20 +122,19 @@ type handler struct {
batchStarted bool
}

// swapResourceMaps sets entries to the given map and returns the original value of h.resources and
// resets immediateNotificationReceived and notificationReceived.
func (h *handler) swapResourceMaps(entries map[string]entry) map[string]entry {
// swapEntries grabs the lock then swaps the entries map to a nil map. It resets notificationReceived
// and immediateNotificationReceived, and returns original entries map that was swapped.
func (h *handler) swapEntries() map[string]entry {
h.lock.Lock()
defer h.lock.Unlock()
entries, h.entries = h.entries, entries
entries := h.entries
h.entries = nil
h.notificationReceived.reset()
h.immediateNotificationReceived.reset()
return entries
}

func (h *handler) loop() {
entries := map[string]entry{}

for {
select {
case <-h.ctx.Done():
Expand All @@ -148,14 +150,14 @@ func (h *handler) loop() {
}
}

entries = h.swapResourceMaps(entries)

entries := h.swapEntries()
if err := h.send(entries); err != nil {
return
}

// TODO: have an admin UI that shows which clients are lagging the most
// Return the used map to the pool after clearing it.
clear(entries)
entryMapPool.Put(entries)
}
}

Expand Down Expand Up @@ -225,6 +227,10 @@ func (h *handler) Notify(name string, r *ads.RawResource, metadata ads.Subscript
return
}

if h.entries == nil {
h.entries = entryMapPool.Get().(map[string]entry)
}

h.entries[name] = entry{
Resource: r,
metadata: metadata,
Expand Down

0 comments on commit d1254dc

Please sign in to comment.