Skip to content

Commit

Permalink
Implement keyed record
Browse files Browse the repository at this point in the history
  • Loading branch information
mgnsk committed Apr 18, 2024
1 parent be04e80 commit 6bf39b7
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 137 deletions.
12 changes: 6 additions & 6 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func (c *Cache[K, V]) Get(key K) (value V, exists bool) {
//
// Range is allowed to modify the cache.
func (c *Cache[K, V]) Range(f func(key K, value V) bool) {
c.backend.Range(func(key K, elem *ringlist.Element[backend.Record[V]]) bool {
return f(key, elem.Value.Value)
c.backend.Range(func(elem *ringlist.Element[backend.Record[K, V]]) bool {
return f(elem.Value.Key, elem.Value.Value)
})
}

Expand Down Expand Up @@ -129,24 +129,24 @@ func (c *Cache[K, V]) Fetch(key K, ttl time.Duration, f func() (V, error)) (valu

// TryFetch is like Fetch but allows the TTL to be returned alongside the value from callback.
func (c *Cache[K, V]) TryFetch(key K, f func() (V, time.Duration, error)) (value V, err error) {
newElem := c.backend.Reserve()
newElem := c.backend.Reserve(key)

if elem, loaded := c.backend.LoadOrStore(key, newElem); loaded {
if elem, loaded := c.backend.LoadOrStore(newElem); loaded {
c.backend.Release(newElem)
return elem.Value.Value, nil
}

defer func() {
if r := recover(); r != nil {
c.backend.Discard(key, newElem)
c.backend.Discard(newElem)

panic(r)
}
}()

value, ttl, err := f()
if err != nil {
c.backend.Discard(key, newElem)
c.backend.Discard(newElem)

var zero V
return zero, err
Expand Down
4 changes: 1 addition & 3 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ func TestOverflow(t *testing.T) {
c.LoadOrStore(i, 0, 0)
}

EventuallyTrue(t, func() bool {
return c.Len() == capacity
})
Equal(t, c.Len(), capacity)
}

func TestExpire(t *testing.T) {
Expand Down
131 changes: 57 additions & 74 deletions internal/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ type Backend[K comparable, V any] struct {
Policy Policy
timer *time.Timer
done chan struct{}
xmap map[K]*ringlist.Element[Record[V]] // map of uninitialized and initialized elements
list ringlist.List[Record[V]] // list of initialized elements
pool sync.Pool // pool of elements
xmap map[K]*ringlist.Element[Record[K, V]] // map of uninitialized and initialized elements
list ringlist.List[Record[K, V]] // list of initialized elements
pool sync.Pool // pool of elements
earliestExpireAt int64
cap int
lastLen int
numDeleted uint64
needRealloc bool
once sync.Once
gcStarted bool
mu sync.Mutex
}

Expand All @@ -36,7 +35,7 @@ func NewBackend[K comparable, V any](capacity int) *Backend[K, V] {
return &Backend[K, V]{
timer: t,
done: make(chan struct{}),
xmap: make(map[K]*ringlist.Element[Record[V]], capacity),
xmap: make(map[K]*ringlist.Element[Record[K, V]], capacity),
cap: capacity,
}
}
Expand All @@ -56,35 +55,35 @@ func (b *Backend[K, V]) Len() int {
}

// Reserve a new uninitialized element.
func (b *Backend[K, V]) Reserve() *ringlist.Element[Record[V]] {
elem, ok := b.pool.Get().(*ringlist.Element[Record[V]])
func (b *Backend[K, V]) Reserve(key K) *ringlist.Element[Record[K, V]] {
elem, ok := b.pool.Get().(*ringlist.Element[Record[K, V]])
if !ok {
elem = ringlist.NewElement(Record[V]{})
elem = ringlist.NewElement(Record[K, V]{})
}

elem.Value.Key = key
elem.Value.wg.Add(1)

return elem
}

// Release a reserved uninitialized element.
func (b *Backend[K, V]) Release(elem *ringlist.Element[Record[V]]) {
defer elem.Value.wg.Done()

func (b *Backend[K, V]) Release(elem *ringlist.Element[Record[K, V]]) {
elem.Value.Key = *new(K)
elem.Value.wg.Done()
b.pool.Put(elem)
}

// Discard a reserved uninitialized element.
func (b *Backend[K, V]) Discard(key K, elem *ringlist.Element[Record[V]]) {
defer elem.Value.wg.Done()

func (b *Backend[K, V]) Discard(elem *ringlist.Element[Record[K, V]]) {
b.mu.Lock()
delete(b.xmap, key)
delete(b.xmap, elem.Value.Key)
elem.Value.wg.Done()
b.mu.Unlock()
}

// Initialize a previously stored uninitialized element.
func (b *Backend[K, V]) Initialize(elem *ringlist.Element[Record[V]], value V, ttl time.Duration) {
func (b *Backend[K, V]) Initialize(elem *ringlist.Element[Record[K, V]], value V, ttl time.Duration) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -99,9 +98,10 @@ func (b *Backend[K, V]) Initialize(elem *ringlist.Element[Record[V]], value V, t
b.list.PushBack(elem)

if n := b.overflow(); n > 0 {
b.startGCOnce()
b.timer.Reset(0)
} else if elem.Value.deadline > 0 {
b.delete(b.list.Front())
}

if elem.Value.deadline > 0 {
b.startGCOnce()
if b.earliestExpireAt == 0 || elem.Value.deadline < b.earliestExpireAt {
b.earliestExpireAt = elem.Value.deadline
Expand All @@ -110,7 +110,7 @@ func (b *Backend[K, V]) Initialize(elem *ringlist.Element[Record[V]], value V, t
}
}

func (b *Backend[K, V]) hit(elem *ringlist.Element[Record[V]]) {
func (b *Backend[K, V]) hit(elem *ringlist.Element[Record[K, V]]) {
switch b.Policy {
case Default:
case LFU:
Expand All @@ -121,7 +121,7 @@ func (b *Backend[K, V]) hit(elem *ringlist.Element[Record[V]]) {
}

// Load an initialized element.
func (b *Backend[K, V]) Load(key K) (value *ringlist.Element[Record[V]], ok bool) {
func (b *Backend[K, V]) Load(key K) (value *ringlist.Element[Record[K, V]], ok bool) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -134,10 +134,10 @@ func (b *Backend[K, V]) Load(key K) (value *ringlist.Element[Record[V]], ok bool
}

// LoadOrStore loads or stores an element.
func (b *Backend[K, V]) LoadOrStore(key K, new *ringlist.Element[Record[V]]) (old *ringlist.Element[Record[V]], loaded bool) {
func (b *Backend[K, V]) LoadOrStore(new *ringlist.Element[Record[K, V]]) (old *ringlist.Element[Record[K, V]], loaded bool) {
tryLoadStore:
b.mu.Lock()
if elem, ok := b.xmap[key]; ok {
if elem, ok := b.xmap[new.Value.Key]; ok {
if elem.Value.initialized {
b.hit(elem)
b.mu.Unlock()
Expand All @@ -149,15 +149,15 @@ tryLoadStore:
goto tryLoadStore
}

b.xmap[key] = new
b.xmap[new.Value.Key] = new

b.mu.Unlock()

return new, false
}

// Range iterates over initialized cache elements in no particular order or consistency.
func (b *Backend[K, V]) Range(f func(key K, r *ringlist.Element[Record[V]]) bool) {
func (b *Backend[K, V]) Range(f func(r *ringlist.Element[Record[K, V]]) bool) {
b.mu.Lock()
keys := make([]K, 0, len(b.xmap))
for key := range b.xmap {
Expand All @@ -170,7 +170,7 @@ func (b *Backend[K, V]) Range(f func(key K, r *ringlist.Element[Record[V]]) bool
elem, ok := b.xmap[key]
initialized := ok && elem.Value.initialized
b.mu.Unlock()
if initialized && !f(key, elem) {
if initialized && !f(elem) {
return
}
}
Expand All @@ -184,7 +184,7 @@ func (b *Backend[K, V]) Evict(key K) (V, bool) {
var zero V

if elem, ok := b.xmap[key]; ok && elem.Value.initialized {
b.delete(key, elem)
b.delete(elem)
return elem.Value.Value, true
}

Expand All @@ -199,8 +199,8 @@ func (b *Backend[K, V]) overflow() int {
return 0
}

func (b *Backend[K, V]) delete(key K, elem *ringlist.Element[Record[V]]) {
delete(b.xmap, key)
func (b *Backend[K, V]) delete(elem *ringlist.Element[Record[K, V]]) {
delete(b.xmap, elem.Value.Key)
b.list.Remove(elem)
b.numDeleted++

Expand All @@ -211,73 +211,56 @@ func (b *Backend[K, V]) delete(key K, elem *ringlist.Element[Record[V]]) {
if b.numDeleted > uint64(b.lastLen)/2 {
b.numDeleted = 0
b.lastLen = b.list.Len()
b.needRealloc = true
b.timer.Reset(0)
b.xmap = maps.Clone(b.xmap)
}
}

func (b *Backend[K, V]) startGCOnce() {
b.once.Do(func() {
go func() {
for {
select {
case <-b.done:
b.timer.Stop()
return
case now := <-b.timer.C:
b.RunGC(now.UnixNano())
}
if b.gcStarted {
return
}

b.gcStarted = true

go func() {
for {
select {
case <-b.done:
b.timer.Stop()
return
case now := <-b.timer.C:
b.RunGC(now.UnixNano())
}
}()
})
}
}()
}

func (b *Backend[K, V]) RunGC(now int64) {
b.mu.Lock()
defer b.mu.Unlock()

var (
overflowed map[*ringlist.Element[Record[V]]]bool
numOverflowed = b.overflow()
earliest int64
deleted []*ringlist.Element[Record[K, V]]
)

if numOverflowed > 0 {
overflowed = make(map[*ringlist.Element[Record[V]]]bool, numOverflowed)
b.list.Do(func(e *ringlist.Element[Record[V]]) bool {
if len(overflowed) == numOverflowed {
return false
}

overflowed[e] = true

return true
})
}

var earliest int64

for key, elem := range b.xmap {
if len(overflowed) > 0 && overflowed[elem] {
delete(overflowed, elem)
b.delete(key, elem)
continue
}

b.list.Do(func(elem *ringlist.Element[Record[K, V]]) bool {
deadline := elem.Value.deadline

if deadline > 0 && deadline < now {
b.delete(key, elem)
continue
deleted = append(deleted, elem)
return true
}

if deadline > 0 && (earliest == 0 || deadline < earliest) {
earliest = deadline
}
}

if b.needRealloc {
b.needRealloc = false
b.xmap = maps.Clone(b.xmap)
return true
})

for _, elem := range deleted {
b.delete(elem)
}

b.earliestExpireAt = earliest
Expand Down
Loading

0 comments on commit 6bf39b7

Please sign in to comment.