-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbitcask.go
275 lines (228 loc) · 6.89 KB
/
bitcask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// Package bitcask provides functionality to create and manipulate a key-value datastore.
package bitcask
import (
"errors"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/IslamWalid/bitcask/internal/datastore"
"github.com/IslamWalid/bitcask/internal/keydir"
"github.com/IslamWalid/bitcask/internal/recfmt"
)
const (
// ReadOnly gives the bitcask process a read only permission.
ReadOnly ConfigOpt = 0
// ReadWrite gives the bitcask process read and write permissions.
ReadWrite ConfigOpt = 1
// SyncOnPut makes the bitcask flush all the writes directly to the disk.
SyncOnPut ConfigOpt = 2
// SyncOnDemand gives the user the control on whenever to do flush operation.
SyncOnDemand ConfigOpt = 3
)
// errRequireWrite happens whenever a user with ReadOnly permission tries to do a writing operation.
var errRequireWrite = errors.New("require write permission")
type (
// ConfigOpt represents the config options the user can have.
ConfigOpt int
// options groups the config options passed to Open.
options struct {
syncOption ConfigOpt
accessPermission ConfigOpt
}
// Bitcask represents the bitcask object.
// Bitcask contains the metadata needed to manipulate the bitcask datastore.
// User creates an object of it with to use the bitcask.
// Provides several methods to manipulate the datastore data.
Bitcask struct {
keyDir keydir.KeyDir
usrOpts options
accessMu sync.Mutex
readerCnt int32
dataStore *datastore.DataStore
activeFile *datastore.AppendFile
fileFlags int
}
)
// Open creates a new bitcask object to manipulate the given datastore path.
// It can take options ReadWrite, ReadOnly, SyncOnPut and SyncOnDemand as config options.
// Only one ReadWrite process can open a bitcask at a time.
// Only ReadWrite permission can create a new bitcask datastore.
// Multiple Readers or a single writer is allowed to be in the same datastore in the same time.
// If there is no bitcask datastore in the given path a new datastore is created when ReadWrite permission is given.
func Open(dataStorePath string, opts ...ConfigOpt) (*Bitcask, error) {
b := &Bitcask{}
b.usrOpts = parseUsrOpts(opts)
var privacy keydir.KeyDirPrivacy
var lockMode datastore.LockMode
if b.usrOpts.accessPermission == ReadWrite {
privacy = keydir.PrivateKeyDir
lockMode = datastore.ExclusiveLock
fileFlags := os.O_CREATE | os.O_RDWR
if b.usrOpts.syncOption == SyncOnPut {
fileFlags |= os.O_SYNC
}
b.fileFlags = fileFlags
b.activeFile = datastore.NewAppendFile(dataStorePath, b.fileFlags, datastore.Active)
} else {
privacy = keydir.SharedKeyDir
lockMode = datastore.SharedLock
}
dataStore, err := datastore.NewDataStore(dataStorePath, lockMode)
if err != nil {
return nil, err
}
keyDir, err := keydir.New(dataStorePath, privacy)
if err != nil {
return nil, err
}
b.dataStore = dataStore
b.keyDir = keyDir
return b, nil
}
// Get retrieves the value by key from a bitcask datastore.
// Return an error if key does not exist in the bitcask datastore.
func (b *Bitcask) Get(key string) (string, error) {
var value string
var err error
if b.readerCnt == 0 {
b.accessMu.Lock()
}
atomic.AddInt32(&b.readerCnt, 1)
rec, isExist := b.keyDir[key]
if !isExist {
value = ""
err = fmt.Errorf("%s: %s", key, datastore.ErrKeyNotExist)
} else {
value, err = b.dataStore.ReadValueFromFile(rec.FileId, key, rec.ValuePos, rec.ValueSize)
}
atomic.AddInt32(&b.readerCnt, -1)
if b.readerCnt == 0 {
b.accessMu.Unlock()
}
return value, err
}
// Put stores a value by key in a bitcask datastore.
// Return an error on any system failure when writing the data.
func (b *Bitcask) Put(key, value string) error {
if b.usrOpts.accessPermission == ReadOnly {
return fmt.Errorf("Put: %s", errRequireWrite)
}
tstamp := time.Now().UnixMicro()
b.accessMu.Lock()
defer b.accessMu.Unlock()
n, err := b.activeFile.WriteData(key, value, tstamp)
if err != nil {
return err
}
b.keyDir[key] = recfmt.KeyDirRec{
FileId: b.activeFile.Name(),
ValuePos: uint32(n),
ValueSize: uint32(len(value)),
Tstamp: tstamp,
}
return nil
}
// Delete removes a key from a bitcask datastore
// by appending a special TompStone value that will be deleted in the next merge.
// Return an error if key does not exist in the bitcask datastore.
func (b *Bitcask) Delete(key string) error {
if b.usrOpts.accessPermission == ReadOnly {
return fmt.Errorf("Delete: %s", errRequireWrite)
}
_, err := b.Get(key)
if err != nil {
return err
}
b.Put(key, datastore.TompStone)
return nil
}
// ListKeys list all keys in a bitcask datastore.
func (b *Bitcask) ListKeys() []string {
res := make([]string, 0)
if b.readerCnt == 0 {
b.accessMu.Lock()
}
atomic.AddInt32(&b.readerCnt, 1)
for key := range b.keyDir {
res = append(res, key)
}
atomic.AddInt32(&b.readerCnt, -1)
if b.readerCnt == 0 {
b.accessMu.Unlock()
}
return res
}
// Fold folds over all key/value pairs in a bitcask datastore.
// fun is expected to be in the form: F(K, V, Acc) -> Acc
func (b *Bitcask) Fold(fn func(string, string, any) any, acc any) any {
if b.readerCnt == 0 {
b.accessMu.Lock()
}
atomic.AddInt32(&b.readerCnt, 1)
for key := range b.keyDir {
value, _ := b.Get(key)
acc = fn(key, value, acc)
}
atomic.AddInt32(&b.readerCnt, -1)
if b.readerCnt == 0 {
b.accessMu.Unlock()
}
return acc
}
// Merge rearrange the bitcask datastore in a more compact form.
// Delete values with older timestamps.
// Reduces the disk usage after as it deletes unneeded values.
// Produces hintfiles to provide a faster startup.
// Return an error if ReadWrite permission is not set or on any system failures when writing data.
func (b *Bitcask) Merge() error {
if b.usrOpts.accessPermission == ReadOnly {
return fmt.Errorf("Merge: %s", errRequireWrite)
}
oldFiles, err := b.listOldFiles()
if err != nil {
return err
}
b.accessMu.Lock()
newKeyDir := keydir.KeyDir{}
mergeFile := datastore.NewAppendFile(b.dataStore.Path(), b.fileFlags, datastore.Merge)
defer mergeFile.Close()
for key, rec := range b.keyDir {
if rec.FileId != b.activeFile.Name() {
newRec, err := b.mergeWrite(mergeFile, key)
if err != nil {
if !strings.HasSuffix(err.Error(), datastore.ErrKeyNotExist.Error()) {
b.accessMu.Unlock()
return err
}
} else {
newKeyDir[key] = newRec
}
} else {
newKeyDir[key] = rec
}
}
b.keyDir = newKeyDir
b.accessMu.Unlock()
b.deleteOldFiles(oldFiles)
return nil
}
// Sync flushes all data to the disk.
// Return an error if ReadWrite permission is not set.
func (b *Bitcask) Sync() error {
if b.usrOpts.accessPermission == ReadOnly {
return fmt.Errorf("Sync: %s", errRequireWrite)
}
return b.activeFile.Sync()
}
// Close flushes all data to the disk and closes the bitcask datastore.
// After close the bitcask object cannot be used anymore.
func (b *Bitcask) Close() {
if b.usrOpts.accessPermission == ReadWrite {
b.Sync()
b.activeFile.Close()
}
b.dataStore.Close()
}