This repository has been archived by the owner on Oct 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_store.go
116 lines (104 loc) · 2.48 KB
/
log_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package arctonyx
import (
"encoding/json"
"github.com/dgraph-io/badger"
"github.com/readystock/raft"
)
type logStore Store
var (
logsPrefix = []byte("/_logs_/")
)
func (log *logStore) FirstIndex() (val uint64, err error) {
return log.index(false)
}
func (log *logStore) LastIndex() (uint64, error) {
return log.index(true)
}
func (log *logStore) GetLog(index uint64, raftLog *raft.Log) error {
return log.badger.View(func(txn *badger.Txn) (err error) {
item, err := txn.Get(getKeyForIndex(index))
if err != nil {
return raft.ErrLogNotFound
}
value := make([]byte, 0)
value, err = item.ValueCopy(value)
if err != nil {
return err
}
if err = json.Unmarshal(value, &raftLog); err != nil {
return err
}
return nil
})
}
func (log *logStore) StoreLog(raftLog *raft.Log) error {
return log.StoreLogs([]*raft.Log{raftLog})
}
func (log *logStore) StoreLogs(raftLogs []*raft.Log) error {
return log.badger.Update(func(txn *badger.Txn) error {
for _, log := range raftLogs {
key := getKeyForIndex(log.Index)
val, err := json.Marshal(log)
if err != nil {
return err
}
err = txn.Set(key, val)
if err != nil {
return err
}
}
return nil
})
}
func (log *logStore) DeleteRange(min, max uint64) error {
return log.badger.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{
PrefetchSize: 100,
})
defer it.Close()
minKey := uint64ToBytes(min)
keys := make([][]byte, 0)
// Get all the keys in the range
for it.Seek(minKey); it.Valid(); it.Next() {
index := getIndexForKey(it.Item().Key())
if index > max {
break
}
keys = append(keys, it.Item().Key())
}
// Delete all of the keys found
for _, key := range keys {
if err := txn.Delete(key); err != nil {
return err
}
}
return nil
})
return nil
}
func getKeyForIndex(index uint64) []byte {
key := make([]byte, 0)
key = append(key, logsPrefix...)
key = append(key, uint64ToBytes(index)...)
return key
}
func getIndexForKey(key []byte) uint64 {
return bytesToUint64(key[len(logsPrefix):])
}
func (log *logStore) index(reverse bool) (val uint64, err error) {
val = 0
err = log.badger.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{
PrefetchSize: 1,
PrefetchValues: false,
Reverse: reverse,
})
defer it.Close()
for it.Seek(logsPrefix); it.ValidForPrefix(logsPrefix); it.Next() {
val = getIndexForKey(it.Item().Key())
return nil
}
return nil
})
return val, err
}