Skip to content

Commit

Permalink
Enable persiting cache to disk
Browse files Browse the repository at this point in the history
If implemented, the cache will be persisted to disk by calling
`Persist`. It will be loaded when instantiating a cache by calling `New`
if an existing `path` is provided.

Signed-off-by: Soule BA <[email protected]>
  • Loading branch information
souleb committed Jun 17, 2024
1 parent 61276f4 commit da1e921
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 14 deletions.
220 changes: 215 additions & 5 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ limitations under the License.
package cache

import (
"bufio"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"slices"
"sort"
"sync"
Expand Down Expand Up @@ -57,23 +63,26 @@ type cache[T any] struct {
index map[string]*item[T]
// items is the store of elements in the cache.
items []*item[T]
// sorted indicates whether the items are sorted by expiration time.
// It is initially true, and set to false when the items are not sorted.
sorted bool

// capacity is the maximum number of index the cache can hold.
capacity int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
closed bool
path string
buf buffer
// sorted indicates whether the items are sorted by expiration time.
// It is initially true, and set to false when the items are not sorted.
sorted bool
closed bool

mu sync.RWMutex
}

var _ Expirable[any] = &Cache[any]{}

// New creates a new cache with the given configuration.
func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
func New[T any](capacity int, path string, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
opt, err := makeOptions(opts...)
if err != nil {
return nil, fmt.Errorf("failed to apply options: %w", err)
Expand All @@ -84,6 +93,7 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
path: path,
labelsFunc: opt.labelsFunc,
janitor: &janitor[T]{
interval: opt.interval,
Expand All @@ -97,6 +107,16 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]

C := &Cache[T]{cache: c, keyFunc: keyFunc}

if c.path != "" {
// load the cache from the file if it exists
if _, err := os.Stat(c.path); err == nil {
err = c.load()
if err != nil {
return nil, err
}
}
}

if opt.interval > 0 {
go c.janitor.run(c)
}
Expand Down Expand Up @@ -498,3 +518,193 @@ func (j *janitor[T]) run(c *cache[T]) {
}
}
}

// buffer is a helper type used to write data to a byte slice
type buffer []byte

// clear clears the buffer
func (s *buffer) clear() {
*s = (*s)[:0]
}

// writeByteSlice writes a byte slice to the buffer
func (s *buffer) writeByteSlice(v []byte) {
*s = append(*s, v...)
}

// writeUint64 writes a uint64 to the buffer
// it is written in little endian format
func (s *buffer) writeUint64(v uint64) {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], v)
*s = append(*s, buf[:]...)
}

// writeBuf writes the buffer to the file
func (c *cache[T]) writeBuf(file *os.File) error {
if _, err := file.Write(c.buf); err != nil {
return err
}
// sync the file to disk straight away
file.Sync()
return nil
}

// Persist writes the cache to disk
// The cache is written to a temporary file first
// and then renamed to the final file name to atomically
// update the cache file. This is done to avoid corrupting
// the cache file in case of a crash while writing to the file. If a file
// with the same name exists, it is overwritten.
// The cache file is written in the following format:
// key length, key, expiration, data length, data // repeat for each item
// The key length and data length are written as uint64 in little endian format
// The expiration is written as a unix timestamp in seconds as uint64 in little endian format
// The key is written as a byte slice
// The data is written as a json encoded byte slice
func (c *cache[T]) Persist() error {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.writeToBuf(); err != nil {
return err
}

// create new temp file
newFile, err := os.Create(fmt.Sprintf("%s.tmp", c.path))
if err != nil {
errf := os.Remove(fmt.Sprintf("%s.tmp", c.path))
return errors.Join(err, errf)
}

if err := c.writeBuf(newFile); err != nil {
errf := os.Remove(fmt.Sprintf("%s.tmp", c.path))
return errors.Join(err, errf)
}

// close the file
if err := newFile.Close(); err != nil {
errf := os.Remove(fmt.Sprintf("%s.tmp", c.path))
return errors.Join(err, errf)
}

if err := os.Rename(fmt.Sprintf("%s.tmp", c.path), c.path); err != nil {
return fmt.Errorf("failed to rename file: %w", err)
}

return nil
}

// writeToBuf writes the cache to the buffer
// no locks are taken, the caller should ensure that
// the cache is not being modified while this function is called.
func (c *cache[T]) writeToBuf() error {
c.buf.clear()
for _, item := range c.items {
data, err := json.Marshal(item.object)
if err != nil {
return err
}

// write the key, expiration and data to the buffer
// format: key length, key, expiration, data length, data
// doing this this way, gives us the ability to read the file
// without having to read the entire file into memory. This is
// done for possible future use cases e.g. where the cache file
// could be very large or for range queries.
c.buf.writeUint64(uint64(len(item.key)))
c.buf.writeByteSlice([]byte(item.key))
c.buf.writeUint64(uint64(item.expiresAt.UnixNano()))
c.buf.writeUint64(uint64(len(data)))
c.buf.writeByteSlice(data)
}
return nil
}

// load reads the cache from disk
// The cache file is read in the following format:
// key length, key, expiration, data length, data // repeat for each item
// This function cannot be called concurrently, and should be called
// before the cache is used.
func (c *cache[T]) load() error {
file, err := os.Open(c.path)
if err != nil {
return err
}
defer file.Close()

rd := bufio.NewReader(file)
items, err := c.readFrom(rd)
if err != nil {
return err
}

for _, item := range items {
if len(c.items) >= c.capacity {
break
}
c.items = append(c.items, item)
c.index[item.key] = item
}

if len(c.items) > 0 {
c.metrics.setCachedItems(float64(len(c.items)))
c.sorted = false
}
return nil
}

func (c *cache[T]) readFrom(rd io.Reader) ([]*item[T], error) {
items := make([]*item[T], 0)
for {
// read until EOF
item, err := c.readItem(rd)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
items = append(items, item)
}
return items, nil
}

func (c *cache[T]) readItem(rd io.Reader) (*item[T], error) {
var (
buf = make([]byte, 8)
item item[T]
)
if _, err := io.ReadFull(rd, buf); err != nil {
if err == io.EOF {
return nil, err
}
return nil, err
}
keyLen := binary.LittleEndian.Uint64(buf)
key := make([]byte, keyLen)
if _, err := io.ReadFull(rd, key); err != nil {
return nil, err
}
item.key = string(key)

if _, err := io.ReadFull(rd, buf); err != nil {
return nil, err
}
item.expiresAt = time.Unix(int64(binary.LittleEndian.Uint64(buf)), 0)

if _, err := io.ReadFull(rd, buf); err != nil {
return nil, err
}
dataLen := binary.LittleEndian.Uint64(buf)
data := make([]byte, dataLen)
if _, err := io.ReadFull(rd, data); err != nil {
return nil, err
}

if err := json.Unmarshal(data, &item.object); err != nil {
return nil, err
}

return &item, nil
}
Loading

0 comments on commit da1e921

Please sign in to comment.