Skip to content

Commit

Permalink
Further near cache expiry optimizations (#103)
Browse files Browse the repository at this point in the history
* Further near cache expiry optimizations
* minor comment
* Correct version incompatability
* Add cacheEntriesExpired and cacheEntriesPruned
* Minor corrections
  • Loading branch information
tmiddlet2666 authored Dec 13, 2024
1 parent 90a89ce commit 3fb036d
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 42 deletions.
3 changes: 3 additions & 0 deletions coherence/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,9 @@ To manage the amount of memory used by the near cache, the following options are
Note: You can specify either High-Units or Memory and in either case, optionally, a TTL.
Note: The minimum expiry time for a near cache entry is 1/4 second. This is to ensure that expiry of elements is as efficient
as possible. You will receive an error if you try to set the TTL to a lower value.
The above can be specified by passing [NearCacheOptions] within [WithNearCache] when creating a [NamedMap] or [NamedCache].
See below for various ways of creating near caches.
Expand Down
182 changes: 151 additions & 31 deletions coherence/localcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ type CacheStats interface {
GetTotalGets() int64 // the number of gets against the near cache
GetCachePrunes() int64 // the number of times the near cache was pruned
GetCachePrunesDuration() time.Duration // the duration of all prunes
GetCacheExpires() int64 // the number of times the near cache had expiry event
GetCacheEntriesPruned() int64 // the actual number of cache entries that were pruned
GetCacheExpires() int64 // the number of times the near cache expired entries
GetCacheExpiresDuration() time.Duration // the duration of all expires
GetCacheEntriesExpired() int64 // the actual number of cache entries that were expired
Size() int // the number of entries in the near cache
SizeBytes() int64 // the number of bytes used by the entries (keys and values) in the near cache
ResetStats() // reset the stats for the near cache, not including Size() or SizeBytes()
Expand All @@ -64,16 +66,20 @@ type localCacheImpl[K comparable, V any] struct {
Name string
options *localCacheOptions
sync.Mutex
data map[K]*localCacheEntry[K, V]
cacheHits int64
cacheMisses int64
cacheMissesNannos int64
cachePuts int64
cachePrunes int64
cachePrunesNannos int64
cacheExpires int64
cacheExpiresNannos int64
cacheMemory int64
data map[K]*localCacheEntry[K, V]
expiryMap map[int64]*[]K
nextExpiry time.Time
cacheHits int64
cacheMisses int64
cacheMissesNannos int64
cachePuts int64
cacheEntriesPruned int64
cachePrunes int64
cachePrunesNannos int64
cacheEntriesExpired int64
cacheExpires int64
cacheExpiresNannos int64
cacheMemory int64
}

type localCacheEntry[K comparable, V any] struct {
Expand All @@ -82,6 +88,7 @@ type localCacheEntry[K comparable, V any] struct {
ttl time.Duration
insertTime time.Time
lastAccess time.Time
expiresAt time.Time
}

type pair[K comparable] struct {
Expand Down Expand Up @@ -123,6 +130,7 @@ func (l *localCacheImpl[K, V]) PutWithExpiry(key K, value V, ttl time.Duration)
newEntry := newLocalCacheEntry[K, V](key, value, ttl)

l.updateEntrySize(newEntry, 1)
l.registerExpiry(newEntry)

prev, ok := l.data[key]

Expand Down Expand Up @@ -185,6 +193,7 @@ func (l *localCacheImpl[K, V]) Remove(key K) *V {
v, ok := l.data[key]

if ok {
l.removeExpiry(key)
delete(l.data, key)
l.updateEntrySize(v, -1)
return &v.value
Expand Down Expand Up @@ -219,6 +228,7 @@ func (l *localCacheImpl[K, V]) Clear() {
defer l.Unlock()

l.data = make(map[K]*localCacheEntry[K, V], 0)
l.expiryMap = make(map[int64]*[]K, 0)
l.updateCacheMemory(0)
}

Expand All @@ -232,26 +242,56 @@ func (l *localCacheImpl[K, V]) GetStats() CacheStats {
}

// expireEntries goes through the map to see if any entries have expired due to ttl.
// this is done in buckets of 1/4 second as so to be more efficient. this means the
// min expiry duration is 1/4 of a second.
func (l *localCacheImpl[K, V]) expireEntries() {
if len(l.expiryMap) == 0 {
return
}

var (
keysToDelete = make([]K, 0)
start = time.Now()
bucketsToRemove = make([]int64, 0)
expiryKeys = make([]int64, len(l.expiryMap))
start = time.Now()
startUnixMillis = start.UnixMilli()
index = 0
)

// check for cache expiry
for k, v := range l.data {
if v.ttl > 0 && start.Sub(v.insertTime) > v.ttl {
keysToDelete = append(keysToDelete, k)
}
if start.Before(l.nextExpiry) {
return
}

// delete all the keys that were flagged from the expiry, this may be enough to free up space
for _, k := range keysToDelete {
l.updateEntrySize(l.data[k], -1)
delete(l.data, k)
// get the keys from the map and sort them, so we are seeing the earliest first
for key := range l.expiryMap {
expiryKeys[index] = key
index++
}

if len(keysToDelete) > 0 {
sort.Slice(expiryKeys, func(p, q int) bool {
return p < q
})

for _, expireTime := range expiryKeys {
if expireTime < startUnixMillis {
// need to expire all entries for the expiry key, retrieve the entry
if v, ok := l.expiryMap[expireTime]; ok {
bucketsToRemove = append(bucketsToRemove, expireTime)
for _, k := range *v {
l.updateEntrySize(l.data[k], -1)
atomic.AddInt64(&l.cacheEntriesExpired, 1)
delete(l.data, k)
}
}
}
}

if len(bucketsToRemove) > 0 {
l.nextExpiry = time.Now().Add(time.Duration(256) * time.Millisecond)

for _, b := range bucketsToRemove {
delete(l.expiryMap, b)
}

l.registerExpireNanos(time.Since(start).Nanoseconds())
}
}
Expand Down Expand Up @@ -300,24 +340,35 @@ func (l *localCacheImpl[K, V]) pruneEntries() {
break
}
l.updateEntrySize(l.data[v.key], -1)
atomic.AddInt64(&l.cacheEntriesPruned, 1)
l.removeExpiry(v.key)
delete(l.data, v.key)
}
}
}

func newLocalCacheEntry[K comparable, V any](key K, value V, ttl time.Duration) *localCacheEntry[K, V] {
return &localCacheEntry[K, V]{
now := time.Now()
entry := &localCacheEntry[K, V]{
key: key,
value: value,
ttl: ttl,
insertTime: time.Now(),
insertTime: now,
}
if ttl > 0 {
// granularity of expiry is minimum of 250ms
entry.expiresAt = now.Add(getMillisBucket(ttl))
}

return entry
}

func newLocalCache[K comparable, V any](name string, options ...func(localCache *localCacheOptions)) *localCacheImpl[K, V] {
cache := &localCacheImpl[K, V]{
Name: name,
data: make(map[K]*localCacheEntry[K, V], 0),
Name: name,
data: make(map[K]*localCacheEntry[K, V], 0),
expiryMap: make(map[int64]*[]K, 0),
nextExpiry: time.Now().Add(time.Duration(256) * time.Millisecond),
options: &localCacheOptions{
TTL: 0,
HighUnits: 0,
Expand Down Expand Up @@ -432,6 +483,14 @@ func (l *localCacheImpl[K, V]) GetCachePuts() int64 {
return l.cachePuts
}

func (l *localCacheImpl[K, V]) GetCacheEntriesExpired() int64 {
return l.cacheEntriesExpired
}

func (l *localCacheImpl[K, V]) GetCacheEntriesPruned() int64 {
return l.cacheEntriesPruned
}

func (l *localCacheImpl[K, V]) GetCachePrunes() int64 {
return l.cachePrunes
}
Expand Down Expand Up @@ -462,26 +521,32 @@ func (l *localCacheImpl[K, V]) GetHitRate() float32 {

func (l *localCacheImpl[K, V]) ResetStats() {
atomic.StoreInt64(&l.cachePrunesNannos, 0)
atomic.StoreInt64(&l.cacheExpiresNannos, 0)
atomic.StoreInt64(&l.cacheMissesNannos, 0)
atomic.StoreInt64(&l.cachePrunes, 0)
atomic.StoreInt64(&l.cacheHits, 0)
atomic.StoreInt64(&l.cacheMisses, 0)
atomic.StoreInt64(&l.cachePuts, 0)
atomic.StoreInt64(&l.cacheEntriesExpired, 0)
atomic.StoreInt64(&l.cacheEntriesPruned, 0)
}

func (l *localCacheImpl[K, V]) String() string {
return fmt.Sprintf("localCache{name=%s, options=%v, stats=CacheStats{puts=%v, gets=%v, hits=%v, misses=%v, "+
"missesDuration=%v, hitRate=%v%%, prunes=%v, prunesDuration=%v, expires=%v, expiresDuration=%v, size=%v, memoryUsed=%v}}",
"missesDuration=%v, hitRate=%v%%, prunes=%v, prunesDuration=%v, entriesPruned=%v, expires=%v, expiresDuration=%v, entriesExpired=%v, size=%v, memoryUsed=%v}}",
l.Name, l.options, l.GetCachePuts(), l.GetTotalGets(), l.GetCacheHits(), l.GetCacheMisses(),
l.GetCacheMissesDuration(), l.GetHitRate()*100, l.GetCachePrunes(), l.GetCachePrunesDuration(),
l.GetCacheExpires(), l.GetCacheExpiresDuration(), l.Size(), formatMemory(l.cacheMemory))
l.GetCacheMissesDuration(), l.GetHitRate()*100,
l.GetCachePrunes(), l.GetCachePrunesDuration(), l.GetCacheEntriesPruned(),
l.GetCacheExpires(), l.GetCacheExpiresDuration(), l.GetCacheEntriesExpired(),
l.Size(), formatMemory(l.cacheMemory))
}

// updateEntrySize updates the cacheMemory size based upon a local entry. The sign indicates to either remove or add.
func (l *localCacheImpl[K, V]) updateEntrySize(entry *localCacheEntry[K, V], sign int) {
var size = int64(unsafe.Sizeof(entry.key)) + int64(unsafe.Sizeof(entry.value)) +
int64(unsafe.Sizeof(entry.lastAccess)) + int64(unsafe.Sizeof(entry.ttl)) +
int64(unsafe.Sizeof(entry.insertTime)) + int64(unsafe.Sizeof(entry))
int64(unsafe.Sizeof(entry.insertTime)) + int64(unsafe.Sizeof(entry.expiresAt)) +
int64(unsafe.Sizeof(entry))
l.updateCacheMemory(int64(sign) * size)
}

Expand All @@ -498,3 +563,58 @@ func formatMemory(bytesValue int64) string {
}
return printer.Sprintf("%-.1fGB", float64(bytesValue)/1024/1024/1024)
}

func (l *localCacheImpl[K, V]) registerExpiry(entry *localCacheEntry[K, V]) {
if entry.ttl > 0 {
// get the expires millis in unix millis and key on this
expiresAtMillis := entry.expiresAt.UnixMilli()

// see if we can find an entry for the expires time as millis
v, ok := l.expiryMap[expiresAtMillis]
if !ok {
// create a new map entry
newSlice := []K{entry.key}
l.expiryMap[expiresAtMillis] = &newSlice
} else {
// append to the existing one
*v = append(*v, entry.key)
}
}
}

func (l *localCacheImpl[K, V]) removeExpiry(k K) {
// find the entry for the key and process if it exists
if entry, ok1 := l.data[k]; ok1 {
if entry.ttl > 0 {
expiresAtMillis := entry.expiresAt.UnixMilli()

// see if we can find an entry for the expires time as millis
v, ok := l.expiryMap[expiresAtMillis]
if ok {
// entry exists for expiry, so remove the entry from the slice
existingKeys := *v

if len(existingKeys) == 1 {
// delete the TTL map entry as no keys left in slice
delete(l.expiryMap, expiresAtMillis)
return
}

newSlice := existingKeys[:0]

for _, key := range existingKeys {
if key != entry.key {
newSlice = append(newSlice, key)
}
}

*v = newSlice
}
}
}
}

// getMillisBucket returns the ttl in buckets of 256ms for expiry.
func getMillisBucket(ttl time.Duration) time.Duration {
return time.Duration(ttl.Milliseconds() & ^0xFF) * time.Millisecond
}
Loading

0 comments on commit 3fb036d

Please sign in to comment.