Skip to content

Commit

Permalink
Merge pull request #68 from sei-protocol/RawImport
Browse files Browse the repository at this point in the history
Add Raw Import and Raw Batch for Writing to SS in any order of version
  • Loading branch information
Kbhat1 authored Jul 15, 2024
2 parents cc4ff7a + da2ab01 commit 5611081
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 0 deletions.
50 changes: 50 additions & 0 deletions ss/pebbledb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,53 @@ func (b *Batch) Write() (err error) {

return b.batch.Commit(defaultWriteOpts)
}

// For writing kv pairs in any order of version
type RawBatch struct {
storage *pebble.DB
batch *pebble.Batch
}

func NewRawBatch(storage *pebble.DB) (*RawBatch, error) {
batch := storage.NewBatch()

return &RawBatch{
storage: storage,
batch: batch,
}, nil
}

func (b *RawBatch) Size() int {
return b.batch.Len()
}

func (b *RawBatch) Reset() {
b.batch.Reset()
}

func (b *RawBatch) set(storeKey string, tombstone int64, key, value []byte, version int64) error {
prefixedKey := MVCCEncode(prependStoreKey(storeKey, key), version)
prefixedVal := MVCCEncode(value, tombstone)

if err := b.batch.Set(prefixedKey, prefixedVal, nil); err != nil {
return fmt.Errorf("failed to write PebbleDB batch: %w", err)
}

return nil
}

func (b *RawBatch) Set(storeKey string, key, value []byte, version int64) error {
return b.set(storeKey, 0, key, value, version)
}

func (b *RawBatch) Delete(storeKey string, key []byte, version int64) error {
return b.set(storeKey, version, key, []byte(tombstoneVal), version)
}

func (b *RawBatch) Write() (err error) {
defer func() {
err = errors.Join(err, b.batch.Close())
}()

return b.batch.Commit(defaultWriteOpts)
}
47 changes: 47 additions & 0 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,53 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error {
return nil
}

func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error {
var wg sync.WaitGroup

worker := func() {
defer wg.Done()
batch, err := NewRawBatch(db.storage)
if err != nil {
panic(err)
}

var counter int
for entry := range ch {
err := batch.Set(entry.StoreKey, entry.Key, entry.Value, entry.Version)
if err != nil {
panic(err)
}

counter++
if counter%ImportCommitBatchSize == 0 {
if err := batch.Write(); err != nil {
panic(err)
}

batch, err = NewRawBatch(db.storage)
if err != nil {
panic(err)
}
}
}

if batch.Size() > 0 {
if err := batch.Write(); err != nil {
panic(err)
}
}
}

wg.Add(db.config.ImportNumWorkers)
for i := 0; i < db.config.ImportNumWorkers; i++ {
go worker()
}

wg.Wait()

return nil
}

// RawIterate iterates over all keys and values for a store
func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error) {
// Iterate through all keys and values for a store
Expand Down
62 changes: 62 additions & 0 deletions ss/test/storage_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,3 +1192,65 @@ func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() {
close(triggerStartCh)
wg.Wait()
}

func (s *StorageTestSuite) TestDatabaseImport() {
db, err := s.NewDB(s.T().TempDir(), s.Config)
s.Require().NoError(err)
defer db.Close()

ch := make(chan types.SnapshotNode, 10)
go func() {
for i := 0; i < 10; i++ {
ch <- types.SnapshotNode{
StoreKey: "store1",
Key: []byte(fmt.Sprintf("key%03d", i)),
Value: []byte(fmt.Sprintf("value%03d", i)),
}
}
close(ch)
}()

s.Require().NoError(db.Import(1, ch))

for i := 0; i < 10; i++ {
val, err := db.Get("store1", 1, []byte(fmt.Sprintf("key%03d", i)))
s.Require().NoError(err)
s.Require().Equal([]byte(fmt.Sprintf("value%03d", i)), val)
}
}

func (s *StorageTestSuite) TestDatabaseRawImport() {
db, err := s.NewDB(s.T().TempDir(), s.Config)
s.Require().NoError(err)
defer db.Close()

ch := make(chan types.RawSnapshotNode, 10)
var wg sync.WaitGroup

// Write versions in reverse order intentionally
for i := 10; i >= 0; i-- {
wg.Add(1)
go func(i int) {
defer wg.Done()
ch <- types.RawSnapshotNode{
StoreKey: "store1",
Key: []byte(fmt.Sprintf("key%03d", i)),
Value: []byte(fmt.Sprintf("value%03d", i)),
Version: int64(i + 1),
}
}(i)
}

go func() {
wg.Wait()
close(ch)
}()

s.Require().NoError(db.RawImport(ch))

for i := 0; i <= 10; i++ {
val, err := db.Get("store1", int64(i+1), []byte(fmt.Sprintf("key%03d", i)))
s.Require().NoError(err)
s.Require().Equal([]byte(fmt.Sprintf("value%03d", i)), val)
}
}
19 changes: 19 additions & 0 deletions ss/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type StateStore interface {
// Import the initial state of the store
Import(version int64, ch <-chan SnapshotNode) error

// Import the kv entries into the store in any order of version
RawImport(ch <-chan RawSnapshotNode) error

// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Expand Down Expand Up @@ -73,3 +76,19 @@ type SnapshotNode struct {
Key []byte
Value []byte
}

type RawSnapshotNode struct {
StoreKey string
Key []byte
Value []byte
Version int64
}

func GetRawSnapshotNode(node SnapshotNode, version int64) RawSnapshotNode {
return RawSnapshotNode{
StoreKey: node.StoreKey,
Key: node.Key,
Value: node.Value,
Version: version,
}
}

0 comments on commit 5611081

Please sign in to comment.