-
Notifications
You must be signed in to change notification settings - Fork 0
/
rolling_index.go
106 lines (88 loc) · 2.54 KB
/
rolling_index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package common
import (
"strconv"
"github.com/bolaxy/errors"
)
// RollingIndex ...
type RollingIndex struct {
name string
size int
lastIndex int
items []interface{}
}
// NewRollingIndex ...
func NewRollingIndex(name string, size int) *RollingIndex {
return &RollingIndex{
name: name,
size: size,
items: make([]interface{}, 0, 2*size),
lastIndex: -1,
}
}
// GetLastWindow ...
func (r *RollingIndex) GetLastWindow() (lastWindow []interface{}, lastIndex int) {
return r.items, r.lastIndex
}
// Get ...
func (r *RollingIndex) Get(skipIndex int) ([]interface{}, error) {
res := make([]interface{}, 0)
if skipIndex > r.lastIndex {
return res, nil
}
cachedItems := len(r.items)
//assume there are no gaps between indexes
oldestCachedIndex := r.lastIndex - cachedItems + 1
if skipIndex+1 < oldestCachedIndex {
return res, errors.NewStoreErr(r.name, errors.TooLate, strconv.Itoa(skipIndex))
}
//index of 'skipped' in RollingIndex
start := skipIndex - oldestCachedIndex + 1
return r.items[start:], nil
}
// GetItem ...
func (r *RollingIndex) GetItem(index int) (interface{}, error) {
items := len(r.items)
oldestCached := r.lastIndex - items + 1
if index < oldestCached {
return nil, errors.NewStoreErr(r.name, errors.TooLate, strconv.Itoa(index))
}
findex := index - oldestCached
if findex >= items {
return nil, errors.NewStoreErr(r.name, errors.KeyNotFound, strconv.Itoa(index))
}
return r.items[findex], nil
}
// Set ...
func (r *RollingIndex) Set(item interface{}, index int) error {
//only allow to setting items with index <= lastIndex + 1 so we may assume
//there are no gaps between items
if 0 <= r.lastIndex && index > r.lastIndex+1 {
return errors.NewStoreErr(r.name, errors.SkippedIndex, strconv.Itoa(index))
}
//adding a new item
if r.lastIndex < 0 || (index == r.lastIndex+1) {
if len(r.items) >= 2*r.size {
r.Roll()
}
r.items = append(r.items, item)
r.lastIndex = index
return nil
}
//replace an existing item. Make sure index is also greater or equal than
//the oldest cached item's index
cachedItems := len(r.items)
oldestCachedIndex := r.lastIndex - cachedItems + 1
if index < oldestCachedIndex {
return errors.NewStoreErr(r.name, errors.TooLate, strconv.Itoa(index))
}
//replacing existing item
position := index - oldestCachedIndex //position of 'index' in RollingIndex
r.items[position] = item
return nil
}
// Roll ...
func (r *RollingIndex) Roll() {
newList := make([]interface{}, 0, 2*r.size)
newList = append(newList, r.items[r.size:]...)
r.items = newList
}