Skip to content

Commit

Permalink
Refactor timecache implementations (#523)
Browse files Browse the repository at this point in the history
* reimplement timecache for sane and performant behaviour

* remove seenMessagesMx, take advantage of new tc api

* fix timecache tests

* fix typo

* store expiry, don't make life difficult

* refactor common background sweep procedure for both impls

* add godocs to TimeCache
  • Loading branch information
vyzo authored Feb 21, 2023
1 parent 3dbc2fd commit 56c0e6c
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 130 deletions.
13 changes: 2 additions & 11 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ type PubSub struct {
inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream

seenMessagesMx sync.Mutex
seenMessages timecache.TimeCache
seenMsgTTL time.Duration
seenMsgStrategy timecache.Strategy
Expand Down Expand Up @@ -567,6 +566,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
}
p.peers = nil
p.topics = nil
p.seenMessages.Done()
}()

for {
Expand Down Expand Up @@ -985,22 +985,13 @@ func (p *PubSub) notifySubs(msg *Message) {

// seenMessage returns whether we already saw this message before
func (p *PubSub) seenMessage(id string) bool {
p.seenMessagesMx.Lock()
defer p.seenMessagesMx.Unlock()
return p.seenMessages.Has(id)
}

// markSeen marks a message as seen such that seenMessage returns `true' for the given id
// returns true if the message was freshly marked
func (p *PubSub) markSeen(id string) bool {
p.seenMessagesMx.Lock()
defer p.seenMessagesMx.Unlock()
if p.seenMessages.Has(id) {
return false
}

p.seenMessages.Add(id)
return true
return p.seenMessages.Add(id)
}

// subscribedToMessage returns whether we are subscribed to one of the topics
Expand Down
82 changes: 33 additions & 49 deletions timecache/first_seen_cache.go
Original file line number Diff line number Diff line change
@@ -1,72 +1,56 @@
package timecache

import (
"container/list"
"context"
"sync"
"time"
)

// FirstSeenCache is a thread-safe copy of https://github.com/whyrusleeping/timecache.
// FirstSeenCache is a time cache that only marks the expiry of a message when first added.
type FirstSeenCache struct {
q *list.List
m map[string]time.Time
span time.Duration
guard *sync.RWMutex
}
lk sync.RWMutex
m map[string]time.Time
ttl time.Duration

func newFirstSeenCache(span time.Duration) TimeCache {
return &FirstSeenCache{
q: list.New(),
m: make(map[string]time.Time),
span: span,
guard: new(sync.RWMutex),
}
done func()
}

func (tc FirstSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()
var _ TimeCache = (*FirstSeenCache)(nil)

_, ok := tc.m[s]
if ok {
log.Debug("first-seen: got same entry")
return
func newFirstSeenCache(ttl time.Duration) *FirstSeenCache {
tc := &FirstSeenCache{
m: make(map[string]time.Time),
ttl: ttl,
}

// TODO(#515): Do GC in the background
tc.sweep()
ctx, done := context.WithCancel(context.Background())
tc.done = done
go background(ctx, &tc.lk, tc.m)

tc.m[s] = time.Now()
tc.q.PushFront(s)
return tc
}

func (tc FirstSeenCache) sweep() {
for {
back := tc.q.Back()
if back == nil {
return
}
func (tc *FirstSeenCache) Done() {
tc.done()
}

v := back.Value.(string)
t, ok := tc.m[v]
if !ok {
panic("inconsistent cache state")
}
func (tc *FirstSeenCache) Has(s string) bool {
tc.lk.RLock()
defer tc.lk.RUnlock()

if time.Since(t) > tc.span {
tc.q.Remove(back)
delete(tc.m, v)
} else {
return
}
}
_, ok := tc.m[s]
return ok
}

func (tc FirstSeenCache) Has(s string) bool {
tc.guard.RLock()
defer tc.guard.RUnlock()
func (tc *FirstSeenCache) Add(s string) bool {
tc.lk.Lock()
defer tc.lk.Unlock()

_, ok := tc.m[s]
if ok {
return false
}

ts, ok := tc.m[s]
// Only consider the entry found if it was present in the cache AND hadn't already expired.
return ok && time.Since(ts) <= tc.span
tc.m[s] = time.Now().Add(tc.ttl)
return true
}
9 changes: 7 additions & 2 deletions timecache/first_seen_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@ func TestFirstSeenCacheFound(t *testing.T) {
}

func TestFirstSeenCacheExpire(t *testing.T) {
backgroundSweepInterval = time.Second

tc := newFirstSeenCache(time.Second)
for i := 0; i < 11; i++ {
for i := 0; i < 10; i++ {
tc.Add(fmt.Sprint(i))
time.Sleep(time.Millisecond * 100)
}

time.Sleep(2 * time.Second)
if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}

func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) {
backgroundSweepInterval = time.Second

tc := newFirstSeenCache(time.Second)
tc.Add(fmt.Sprint(0))
time.Sleep(1100 * time.Millisecond)

time.Sleep(2 * time.Second)
if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
Expand Down
94 changes: 34 additions & 60 deletions timecache/last_seen_cache.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,58 @@
package timecache

import (
"context"
"sync"
"time"

"github.com/emirpasic/gods/maps/linkedhashmap"
)

// LastSeenCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has elapsed,
// "old" entries will be purged from the cache.
//
// It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This
// keeps frequently occurring entries cached and prevents them from being propagated, especially because of network
// issues that might increase the number of duplicate messages in the network.
//
// Garbage collection of expired entries is event-driven, i.e. it only happens when there is a new entry added to the
// cache. This should be ok - if existing entries are being looked up then the cache is not growing, and when a new one
// appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache.
//
// This implementation is heavily inspired by https://github.com/whyrusleeping/timecache.
// LastSeenCache is a time cache that extends the expiry of a seen message when added
// or checked for presence with Has..
type LastSeenCache struct {
m *linkedhashmap.Map
span time.Duration
guard *sync.Mutex
lk sync.Mutex
m map[string]time.Time
ttl time.Duration

done func()
}

func newLastSeenCache(span time.Duration) TimeCache {
return &LastSeenCache{
m: linkedhashmap.New(),
span: span,
guard: new(sync.Mutex),
var _ TimeCache = (*LastSeenCache)(nil)

func newLastSeenCache(ttl time.Duration) *LastSeenCache {
tc := &LastSeenCache{
m: make(map[string]time.Time),
ttl: ttl,
}
}

func (tc *LastSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()
ctx, done := context.WithCancel(context.Background())
tc.done = done
go background(ctx, &tc.lk, tc.m)

tc.add(s)
return tc
}

// Garbage collect expired entries
// TODO(#515): Do GC in the background
tc.gc()
func (tc *LastSeenCache) Done() {
tc.done()
}

func (tc *LastSeenCache) add(s string) {
// We don't need a lock here because this function is always called with the lock already acquired.
func (tc *LastSeenCache) Add(s string) bool {
tc.lk.Lock()
defer tc.lk.Unlock()

// If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and
// an accurate sliding window.
tc.m.Remove(s)
now := time.Now()
tc.m.Put(s, &now)
}
_, ok := tc.m[s]
tc.m[s] = time.Now().Add(tc.ttl)

func (tc *LastSeenCache) gc() {
// We don't need a lock here because this function is always called with the lock already acquired.
iter := tc.m.Iterator()
for iter.Next() {
key := iter.Key()
ts := iter.Value().(*time.Time)
// Exit if we've found an entry with an unexpired timestamp. Since we're iterating in order of insertion, all
// entries hereafter will be unexpired.
if time.Since(*ts) <= tc.span {
return
}
tc.m.Remove(key)
}
return !ok
}

func (tc *LastSeenCache) Has(s string) bool {
tc.guard.Lock()
defer tc.guard.Unlock()
tc.lk.Lock()
defer tc.lk.Unlock()

// If the entry exists and has not already expired, slide it forward.
if ts, found := tc.m.Get(s); found {
if t := ts.(*time.Time); time.Since(*t) <= tc.span {
tc.add(s)
return true
}
_, ok := tc.m[s]
if ok {
tc.m[s] = time.Now().Add(tc.ttl)
}
return false

return ok
}
8 changes: 7 additions & 1 deletion timecache/last_seen_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ func TestLastSeenCacheFound(t *testing.T) {
}

func TestLastSeenCacheExpire(t *testing.T) {
backgroundSweepInterval = time.Second
tc := newLastSeenCache(time.Second)
for i := 0; i < 11; i++ {
tc.Add(fmt.Sprint(i))
time.Sleep(time.Millisecond * 100)
}

time.Sleep(2 * time.Second)
if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}

func TestLastSeenCacheSlideForward(t *testing.T) {
t.Skip("timing is too fine grained to run in CI")

tc := newLastSeenCache(time.Second)
i := 0

Expand Down Expand Up @@ -74,10 +78,12 @@ func TestLastSeenCacheSlideForward(t *testing.T) {
}

func TestLastSeenCacheNotFoundAfterExpire(t *testing.T) {
backgroundSweepInterval = time.Second

tc := newLastSeenCache(time.Second)
tc.Add(fmt.Sprint(0))
time.Sleep(1100 * time.Millisecond)

time.Sleep(2 * time.Second)
if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
Expand Down
28 changes: 21 additions & 7 deletions timecache/time_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,45 @@ import (

var log = logger.Logger("pubsub/timecache")

// Stategy is the TimeCache expiration strategy to use.
type Strategy uint8

const (
// Strategy_FirstSeen expires an entry from the time it was added.
Strategy_FirstSeen Strategy = iota
// Stategy_LastSeen expires an entry from the last time it was touched by an Add or Has.
Strategy_LastSeen
)

// TimeCache is a cahe of recently seen messages (by id).
type TimeCache interface {
Add(string)
// Add adds an id into the cache, if it is not already there.
// Returns true if the id was newly added to the cache.
// Depending on the implementation strategy, it may or may not update the expiry of
// an existing entry.
Add(string) bool
// Has checks the cache for the presence of an id.
// Depending on the implementation strategy, it may or may not update the expiry of
// an existing entry.
Has(string) bool
// Done signals that the user is done with this cache, which it may stop background threads
// and relinquish resources.
Done()
}

// NewTimeCache defaults to the original ("first seen") cache implementation
func NewTimeCache(span time.Duration) TimeCache {
return NewTimeCacheWithStrategy(Strategy_FirstSeen, span)
func NewTimeCache(ttl time.Duration) TimeCache {
return NewTimeCacheWithStrategy(Strategy_FirstSeen, ttl)
}

func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache {
func NewTimeCacheWithStrategy(strategy Strategy, ttl time.Duration) TimeCache {
switch strategy {
case Strategy_FirstSeen:
return newFirstSeenCache(span)
return newFirstSeenCache(ttl)
case Strategy_LastSeen:
return newLastSeenCache(span)
return newLastSeenCache(ttl)
default:
// Default to the original time cache implementation
return newFirstSeenCache(span)
return newFirstSeenCache(ttl)
}
}
Loading

0 comments on commit 56c0e6c

Please sign in to comment.