Skip to content

Commit

Permalink
update locking mechanism when set key
Browse files Browse the repository at this point in the history
  • Loading branch information
luqmansen committed Jul 20, 2022
1 parent 6baaa17 commit ac5c282
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 63 deletions.
9 changes: 2 additions & 7 deletions store_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ func (s *DiskStorage) Set(key, value []byte) error {
data := newEntry(time.Now().UnixNano(), key, value)
dataSize, databyte := data.encode()

s.Lock()
fileID, file := s.currentFiles()
if file.Size() >= s.maxFileSize {
fileID, file = s.addNewDataFile()
}
s.Unlock()

_, offset, err := file.Write(databyte)
if err != nil {
Expand Down Expand Up @@ -232,9 +234,7 @@ func (s *DiskStorage) initKeyDir() {

currOffset += int64(totalSize)
}

}

}
}

Expand Down Expand Up @@ -264,9 +264,6 @@ func (s *DiskStorage) flush() error {

// addNewDataFile will add new datafile to file list and return its file id
func (s *DiskStorage) addNewDataFile() (int, *datafile) {
s.Lock()
defer s.Unlock()

fileID := len(s.files)
fileName := s.dbFileFullPath + "_" + strconv.Itoa(fileID)
file := openDataFile(fileName)
Expand All @@ -277,10 +274,8 @@ func (s *DiskStorage) addNewDataFile() (int, *datafile) {

//currentFiles will get index of current active file and the file itself
func (s *DiskStorage) currentFiles() (int, *datafile) {
s.RLock()
fileID := len(s.files) - 1
f := s.files[fileID]
s.RUnlock()

return fileID, f
}
71 changes: 15 additions & 56 deletions store_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ import (
"testing"

"github.com/google/uuid"

"github.com/stretchr/testify/assert"
)

func initStorageHelper(name ...string) (*DiskStorage, string, func()) {
filename := ""
baseTestPath := "testdata" // location for test file, so we don't clutter root project
testFolder := uuid.NewString() // each test will get its own folder
baseTestPath := "testdata" // location for test file, so we don't clutter root project
testFolder := name[0] + "_" + uuid.NewString() // each test will get its own folder

if len(name) >= 1 {
filename = path.Join(append([]string{baseTestPath, testFolder}, name...)...)
filename = path.Join(append([]string{baseTestPath, testFolder}, name[1:]...)...)
} else {
filename = path.Join(baseTestPath, testFolder, uuid.NewString())
}
Expand Down Expand Up @@ -174,13 +173,14 @@ func TestDiskStorage_multiKey(t *testing.T) {

func TestDiskStorage_concurrent(t *testing.T) {

t.Run("concurrent 10K Key, 1KB Filesize", func(t *testing.T) {
store, _, cleanupFunc := initStorageHelper(t.Name(), "test")
defer cleanupFunc()
t.Run("concurrent 1K Key, 1KB Filesize", func(t *testing.T) {
store, filePath, _ := initStorageHelper(t.Name(), "test")
//defer cleanupFunc()

store.WithOptions(NewOptions().SetMaxFileSize("1KB"))

kv := make(map[string][]byte)
for i := 0; i <= 10_000; i++ {
for i := 0; i <= 1_000; i++ { // this will generate ~29KB of data
kv[strconv.Itoa(i)] = []byte(strconv.Itoa(i))
}

Expand Down Expand Up @@ -220,6 +220,13 @@ func TestDiskStorage_concurrent(t *testing.T) {
}(k, v)
}
wgGet.Wait()

subPath, _ := path.Split(filePath)
dirs, err := os.ReadDir(subPath)
if err != nil {
panic(err)
}
assert.Len(t, dirs, 29)
})

t.Run("concurrent 10K Key, 1MB Filesize", func(t *testing.T) {
Expand Down Expand Up @@ -318,54 +325,6 @@ func TestDiskStorage_concurrent(t *testing.T) {
wgGet.Wait()
})

t.Run("concurrent 1M Key, 100MB Filesize", func(t *testing.T) {
store, _, cleanupFunc := initStorageHelper(t.Name(), "test")
defer cleanupFunc()
store.WithOptions(NewOptions().SetMaxFileSize("100MB"))

kv := make(map[string][]byte)
for i := 0; i <= 1_000_000; i++ {
kv[strconv.Itoa(i)] = []byte(strconv.Itoa(i))
}

limitChan := make(chan struct{}, 8000) // limit maximum number of goroutine on test with race detector

var wgAdd sync.WaitGroup
for k, v := range kv {
wgAdd.Add(1)
limitChan <- struct{}{}

go func(k, v []byte) {
defer wgAdd.Done()

assert.NoError(t, store.Set(k, v))
res, err := store.Get(k)
assert.Nil(t, err)
equalByte(t, v, res)

<-limitChan
}([]byte(k), v)
}
wgAdd.Wait()

var wgGet sync.WaitGroup
for k, v := range kv {
wgGet.Add(1)
limitChan <- struct{}{}

go func(k string, v []byte) {
defer wgGet.Done()

res, err := store.Get([]byte(k))
assert.Nil(t, err)
assert.Equal(t, v, res)

<-limitChan
}(k, v)
}
wgGet.Wait()
})

}

func BenchmarkDiskStorage_Set(b *testing.B) {
Expand Down

0 comments on commit ac5c282

Please sign in to comment.