-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bigcache代码分析 #41
Comments
https://github.com/allegro/bigcache/blob/d3696373dd1331cdc65e20cea65d351bee988a8e/bigcache.go package bigcache
import (
"log"
"sync"
"github.com/allegro/bigcache/queue"
)
const (
minimumEntriesInShard = 10 // 单个分片里面,最小的条数
)
type BigCache struct {
shards []*cacheShard
lifeWindow uint64
clock clock
hash hash
config Config
}
type cacheShard struct {
hashmap map[uint64]uint32
entries queue.BytesQueue
lock sync.RWMutex
}
// NewBigCache 初始化一个BigCache
func NewBigCache(config Config) *BigCache {
return newBigCache(config, &systemClock{})
}
func newBigCache(config Config, clock clock) *BigCache {
cache := &BigCache{
shards: make([]*cacheShard, config.Shards),
lifeWindow: uint64(config.LifeWindow.Seconds()),
clock: clock,
hash: fnv64a{},
config: config,
}
// config.MaxEntriesInWindow 在生命周期窗口最大的entry的数量. 在每个分片里面分配合适的entry的大小. 设置得合适的话 cache 不会再分配额外的内存
shardSize := max(config.MaxEntriesInWindow/config.Shards, minimumEntriesInShard)
for i := 0; i < config.Shards; i++ {
cache.shards[i] = &cacheShard{
hashmap: make(map[uint64]uint32, shardSize),
entries: *queue.NewBytesQueue(shardSize*config.MaxEntrySize, config.Verbose),
}
}
return cache
}
// Get reads entry for the key
func (c *BigCache) Get(key string) ([]byte, error) {
hashedKey := c.hash.sum(key)
shard := c.getShard(hashedKey)
shard.lock.RLock()
defer shard.lock.RUnlock()
itemIndex := shard.hashmap[hashedKey]
if itemIndex == 0 {
return nil, notFound(key)
}
wrappedEntry, err := shard.entries.Get(int(itemIndex))
if err != nil {
return nil, err
}
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
if c.config.Verbose {
log.Printf("Collision detected. Both %q and %q has same hash %x", key, entryKey, hashedKey)
}
return nil, notFound(key)
}
return readEntry(wrappedEntry), nil
}
// Set saves entry under the key
func (c *BigCache) Set(key string, entry []byte) {
hashedKey := c.hash.sum(key)
shard := c.getShard(hashedKey)
shard.lock.Lock()
defer shard.lock.Unlock()
currentTimestamp := uint64(c.clock.epoch())
if previousIndex := shard.hashmap[hashedKey]; previousIndex != 0 {
if previousEntry, err := shard.entries.Get(int(previousIndex)); err == nil {
resetKeyFromEntry(previousEntry)
}
}
if oldestEntry, err := shard.entries.Peek(); err == nil {
c.onEvict(oldestEntry, currentTimestamp, func() {
shard.entries.Pop()
hash := readHashFromEntry(oldestEntry)
delete(shard.hashmap, hash)
})
}
w := wrapEntry(currentTimestamp, hashedKey, key, entry)
index := shard.entries.Push(w)
shard.hashmap[hashedKey] = uint32(index)
}
func (c *BigCache) onEvict(oldestEntry []byte, currentTimestamp uint64, evict func()) {
oldestTimestamp := readTimestampFromEntry(oldestEntry)
if currentTimestamp-oldestTimestamp > c.lifeWindow {
evict()
}
}
func (c *BigCache) getShard(hashedKey uint64) (shard *cacheShard) {
shardKey := hashedKey % uint64(len(c.shards))
return c.shards[shardKey]
}
func max(a, b int) int {
if a > b {
return a
}
return b
} |
package queue
import (
"encoding/binary"
"log"
"time"
)
const (
headerEntrySize = 4 // Entry的头大小,Entry头是用来记录entry长度的
leftMarginIndex = 1 // Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index
)
//BytesQueue是一个用golang array实现的一个非线程安全先进先出队列类型
// For every push operation index of entry is returned. It can be used to read the entry later
type BytesQueue struct {
array []byte
capacity int
head int
tail int
count int
rightMargin int
headerBuffer []byte
verbose bool
}
type queueError struct {
message string
}
// NewBytesQueue initialize new bytes queue.
// Initial capacity is used in bytes array allocation
// When verbose flag is set then information about memory allocation are printed
func NewBytesQueue(initialCapacity int, verbose bool) *BytesQueue {
return &BytesQueue{
array: make([]byte, initialCapacity),
capacity: initialCapacity,
headerBuffer: make([]byte, headerEntrySize),
tail: leftMarginIndex,
head: leftMarginIndex,
verbose: verbose,
}
}
// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
// Returns index for pushed data
func (q *BytesQueue) Push(data []byte) int {
dataLen := len(data)
if q.availableSpaceAfterTail() < dataLen+headerEntrySize {
if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize {
q.tail = leftMarginIndex
} else {
q.allocateAdditionalMemory()
}
}
index := q.tail
q.push(data, dataLen)
return index
}
func (q *BytesQueue) allocateAdditionalMemory() {
start := time.Now()
q.capacity = q.capacity * 2
newArray := make([]byte, q.capacity)
copy(newArray[leftMarginIndex:], q.array[q.head:q.rightMargin])
newTail := q.rightMargin - q.head + leftMarginIndex
if q.tail <= q.head {
copy(newArray[newTail:], q.array[leftMarginIndex:q.tail])
newTail += q.tail - leftMarginIndex
}
if q.verbose {
log.Printf("Allocated new queue. Took: %dms, Capacity: %d \n", time.Since(start)/time.Millisecond, q.capacity)
}
q.array = newArray
q.head = leftMarginIndex
q.tail = newTail
q.rightMargin = newTail
}
func (q *BytesQueue) push(data []byte, len int) {
binary.LittleEndian.PutUint32(q.headerBuffer, uint32(len))
q.copy(q.headerBuffer, headerEntrySize)
q.copy(data, len)
if q.tail > q.head {
q.rightMargin = q.tail
}
q.count++
}
func (q *BytesQueue) copy(data []byte, len int) {
q.tail += copy(q.array[q.tail:], data[:len])
}
// Pop reads the oldest entry from queue and moves head pointer to the next one
func (q *BytesQueue) Pop() ([]byte, error) {
if q.count == 0 {
return nil, &queueError{"Empty queue"}
}
data, size := q.peek(q.head)
q.head += headerEntrySize + size
q.count--
if q.head == q.rightMargin {
q.head = leftMarginIndex
if q.tail == q.rightMargin {
q.tail = leftMarginIndex
}
q.rightMargin = q.tail
}
return data, nil
}
// Peek reads the oldest entry from list without moving head pointer
func (q *BytesQueue) Peek() ([]byte, error) {
if q.count == 0 {
return nil, &queueError{"Empty queue"}
}
data, _ := q.peek(q.head)
return data, nil
}
// Get reads entry from index
func (q *BytesQueue) Get(index int) ([]byte, error) {
if index <= 0 {
return nil, &queueError{"Index must be grater than zero. Invalid index."}
}
data, _ := q.peek(index)
return data, nil
}
// Capacity returns number of allocated bytes for queue
func (q *BytesQueue) Capacity() int {
return q.capacity
}
// Len returns number of entries kept in queue
func (q *BytesQueue) Len() int {
return q.count
}
// Error returns error message
func (e *queueError) Error() string {
return e.message
}
func (q *BytesQueue) peek(index int) ([]byte, int) {
blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize]))
return q.array[index+headerEntrySize : index+headerEntrySize+blockSize], blockSize
}
func (q *BytesQueue) availableSpaceAfterTail() int {
if q.tail >= q.head {
return q.capacity - q.tail
}
return q.head - q.tail
}
func (q *BytesQueue) availableSpaceBeforeHead() int {
if q.tail >= q.head {
return q.head - leftMarginIndex
}
return q.head - q.tail
} |
https://github.com/allegro/bigcache/blob/d3696373dd1331cdc65e20cea65d351bee988a8e/encoding.go package bigcache
import (
"encoding/binary"
)
const (
timestampSizeInBytes = 8 // Number of bytes used for timestamp
hashSizeInBytes = 8 // Number of bytes used for hash
keySizeInBytes = 2 // Number of bytes used for size of entry key
headersSizeInBytes = timestampSizeInBytes + hashSizeInBytes + keySizeInBytes // Number of bytes used for all headers
)
func wrapEntry(timestamp uint64, hash uint64, key string, entry []byte) []byte {
var blob []byte
keyLength := len(key)
blob = make([]byte, len(entry)+headersSizeInBytes+keyLength)
binary.LittleEndian.PutUint64(blob, timestamp)
binary.LittleEndian.PutUint64(blob[timestampSizeInBytes:], hash)
binary.LittleEndian.PutUint16(blob[timestampSizeInBytes+hashSizeInBytes:], uint16(keyLength))
copy(blob[headersSizeInBytes:], []byte(key))
copy(blob[headersSizeInBytes+keyLength:], entry)
return blob
}
func readEntry(data []byte) []byte {
length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])
return data[headersSizeInBytes+length:]
}
func readTimestampFromEntry(data []byte) uint64 {
return binary.LittleEndian.Uint64(data)
}
func readKeyFromEntry(data []byte) string {
length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])
return string(data[headersSizeInBytes : headersSizeInBytes+length])
}
func readHashFromEntry(data []byte) uint64 {
return binary.LittleEndian.Uint64(data[timestampSizeInBytes:])
}
func resetKeyFromEntry(data []byte) {
binary.LittleEndian.PutUint64(data[timestampSizeInBytes:], 0)
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
随着1.9的发布,sync的map性能已经不差
分片
将整个缓存分成独立的 N 块, 然后用 hash(key) % (N-1)的计算方法来决定存入哪一块. 这样可以每一块各有自己的 RWMutex 锁.
与分片有关的代码可简化如下:
https://github.com/allegro/bigcache/blob/master/bigcache.go
用 map 储存 key
绕了这么大一圈子, 最后不还是用了 map 吗? 其实还是不太一样的
使用这个方法的目的源自 Go 1.5 的一个优化: 如果 map 的 key 或 value 中都不含指针, GC 便会忽略这个 map. map O(1) 的效率还是很吸引人的, 所以只要 KV 都不含指针, map 便非常值得一用.
但是, 只有数字和 bool 是与指针无关的. 难不成我们这个缓存系统只支持 int -> int 的 KV 吗? 这不现实. 于是作者想了一个曲线救国的方法: 所有的 value 都保存在一个 bytes queue 里(下文会说), 然后保存这个value的头部所在的索引值,通过索引值来访问, 索引值当然是 int 类型; 而string类型的 key 值只要 hash 一下就能变成 int. 这样 key 和 value 就都转换成 int 类型了.
而且这样需要用 map 保存的值也非常小, 与保存整个 KV 相比, 占用空间也小多了.
综上, 每一个缓存分片里都会有一个 map[int]int 来保存 " hash(key) -> valueIndex" 的关系, 并且每个分片里都会有一个 bytes queue 来储存 value. 与这部分有关的源码简化如下:
https://github.com/allegro/bigcache/blob/master/shard.go
可以注意到, Get/Set 时并不是直接操作 value 而是操作用 key和value和当前时间戳打包而成的一个"entry".
打包的结构其实很简单, 就是在原 value 的前面加了一个 header, 此 header的内容是当前的 timestamp 和当前 value 的长度. 整个结构可以简单表示为 fmt.Sprintf("%d%d%s", currentTimestamp, len(value), value), 当然实际代码中为了保持效率不会这样写, 而是使用 binary 库直接操作 []byte.
这样做的目的也很明显, 就是保存这个 value 被保存时的时间戳和长度. 使用时间戳来实现到期后的 "expire" 功能; 使用长度来维护在 byte queue 中的索引位置.
Bytes queue
前文已多次提到, 所有的 value 都是存在一个 bytes queue 里的.
其核心思路是将打包后的value存入 []byte, 然后维护 head, tail. capacity, count 等值. 具体的实现上没太多可说的, 就是一个很标准的数据结构课上所学的 queue. 实现了 Reset, Push, Pop, Peek, Capacity, Len 等一个 queue 必备的基本操作.
Bytes queue 在初始化时会输入一个最大尺寸 maxCacacity, 而真正用来存数据的数组并不会直接初始化为这个最大尺寸, 而是先初始化到一个比较小的值, 在使用的过程中动态扩容. 每次扩容的容量都是前一次的两倍(直到最大尺寸). 下面是与扩容相关的简化代码:
https://github.com/allegro/bigcache/blob/master/queue/bytes_queue.go
其它
BigCache 在进行 hash(key) 时, 使用的 hash 算法并不是 md5, sha1 等常见的, 而是 FNV 算法. 这个算法更快, 更省内存, 适合于这种生成 hash key 的场景. 而且实现非常简单, 源码中的 64 bit 实现如下:
https://github.com/allegro/bigcache/blob/master/fnv.go
参考
http://45.63.122.37/2016/11/18/用go实现一个very-fast-k-v-缓存/
The text was updated successfully, but these errors were encountered: