From c09628fe98577665d5d447e95fa822bf5e4f2bfa Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 2 Jul 2024 16:43:49 -0400 Subject: [PATCH 1/5] Add Raw Import and Raw Batch for Writing to SS in any order of versoin --- ss/pebbledb/batch.go | 50 ++++++++++++++++++++++++++++++++++ ss/pebbledb/db.go | 64 +++++++++++++++++++++++++++++++++++++++++--- ss/types/store.go | 19 +++++++++++++ 3 files changed, 130 insertions(+), 3 deletions(-) diff --git a/ss/pebbledb/batch.go b/ss/pebbledb/batch.go index 12e6a22..d8dc003 100644 --- a/ss/pebbledb/batch.go +++ b/ss/pebbledb/batch.go @@ -65,3 +65,53 @@ func (b *Batch) Write() (err error) { return b.batch.Commit(defaultWriteOpts) } + +// For writing kv pairs in any order of version +type RawBatch struct { + storage *pebble.DB + batch *pebble.Batch +} + +func NewRawBatch(storage *pebble.DB) (*RawBatch, error) { + batch := storage.NewBatch() + + return &RawBatch{ + storage: storage, + batch: batch, + }, nil +} + +func (b *RawBatch) Size() int { + return b.batch.Len() +} + +func (b *RawBatch) Reset() { + b.batch.Reset() +} + +func (b *RawBatch) set(storeKey string, tombstone int64, key, value []byte, version int64) error { + prefixedKey := MVCCEncode(prependStoreKey(storeKey, key), version) + prefixedVal := MVCCEncode(value, tombstone) + + if err := b.batch.Set(prefixedKey, prefixedVal, nil); err != nil { + return fmt.Errorf("failed to write PebbleDB batch: %w", err) + } + + return nil +} + +func (b *RawBatch) Set(storeKey string, key, value []byte, version int64) error { + return b.set(storeKey, 0, key, value, version) +} + +func (b *RawBatch) Delete(storeKey string, key []byte, version int64) error { + return b.set(storeKey, version, key, []byte(tombstoneVal), version) +} + +func (b *RawBatch) Write() (err error) { + defer func() { + err = errors.Join(err, b.batch.Close()) + }() + + return b.batch.Commit(defaultWriteOpts) +} diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 76d099c..f44a4b0 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -496,18 +496,76 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ // Import loads the initial version of the state in parallel with numWorkers goroutines // TODO: Potentially add retries instead of panics func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { + // Re route to RawImport + rawCh := make(chan types.RawSnapshotNode, db.config.ImportNumWorkers) + go func() { + defer close(rawCh) + for entry := range ch { + rawCh <- types.GetRawSnapshotNode(entry, version) + } + }() + + return db.RawImport(rawCh) + + // var wg sync.WaitGroup + + // worker := func() { + // defer wg.Done() + // batch, err := NewBatch(db.storage, version) + // if err != nil { + // panic(err) + // } + + // var counter int + // for entry := range ch { + // err := batch.Set(entry.StoreKey, entry.Key, entry.Value) + // if err != nil { + // panic(err) + // } + + // counter++ + // if counter%ImportCommitBatchSize == 0 { + // if err := batch.Write(); err != nil { + // panic(err) + // } + + // batch, err = NewBatch(db.storage, version) + // if err != nil { + // panic(err) + // } + // } + // } + + // if batch.Size() > 0 { + // if err := batch.Write(); err != nil { + // panic(err) + // } + // } + // } + + // wg.Add(db.config.ImportNumWorkers) + // for i := 0; i < db.config.ImportNumWorkers; i++ { + // go worker() + // } + + // wg.Wait() + + // return nil +} + +func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { var wg sync.WaitGroup worker := func() { defer wg.Done() - batch, err := NewBatch(db.storage, version) + batch, err := NewRawBatch(db.storage) if err != nil { panic(err) } var counter int for entry := range ch { - err := batch.Set(entry.StoreKey, entry.Key, entry.Value) + err := batch.Set(entry.StoreKey, entry.Key, entry.Value, entry.Version) if err != nil { panic(err) } @@ -518,7 +576,7 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { panic(err) } - batch, err = NewBatch(db.storage, version) + batch, err = NewRawBatch(db.storage) if err != nil { panic(err) } diff --git a/ss/types/store.go b/ss/types/store.go index 171fcfb..4a580f9 100644 --- a/ss/types/store.go +++ b/ss/types/store.go @@ -30,6 +30,9 @@ type StateStore interface { // Import the initial state of the store Import(version int64, ch <-chan SnapshotNode) error + // Import the kv entries into the store in any order of version + RawImport(ch <-chan RawSnapshotNode) error + // Prune attempts to prune all versions up to and including the provided // version argument. The operation should be idempotent. An error should be // returned upon failure. @@ -73,3 +76,19 @@ type SnapshotNode struct { Key []byte Value []byte } + +type RawSnapshotNode struct { + StoreKey string + Key []byte + Value []byte + Version int64 +} + +func GetRawSnapshotNode(node SnapshotNode, version int64) RawSnapshotNode { + return RawSnapshotNode{ + StoreKey: node.StoreKey, + Key: node.Key, + Value: node.Value, + Version: version, + } +} From 3a01e9ac26ef3e7a5ff3e4b282b634e3ae29f2f9 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 2 Jul 2024 17:13:43 -0400 Subject: [PATCH 2/5] Add comment --- ss/pebbledb/db.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index f44a4b0..156cc21 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -507,6 +507,8 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { return db.RawImport(rawCh) + // TODO: Revert. This is just for testing RawImport End to End temporarily + // var wg sync.WaitGroup // worker := func() { From a7ea824bb9484a64902985c2f4c8cf365f4b43fe Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Mon, 15 Jul 2024 09:10:52 -0400 Subject: [PATCH 3/5] Revert changes to Import --- ss/pebbledb/db.go | 95 ++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 54 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 156cc21..6f6ffe0 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -496,63 +496,50 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ // Import loads the initial version of the state in parallel with numWorkers goroutines // TODO: Potentially add retries instead of panics func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { - // Re route to RawImport - rawCh := make(chan types.RawSnapshotNode, db.config.ImportNumWorkers) - go func() { - defer close(rawCh) + var wg sync.WaitGroup + + worker := func() { + defer wg.Done() + batch, err := NewBatch(db.storage, version) + if err != nil { + panic(err) + } + + var counter int for entry := range ch { - rawCh <- types.GetRawSnapshotNode(entry, version) + err := batch.Set(entry.StoreKey, entry.Key, entry.Value) + if err != nil { + panic(err) + } + + counter++ + if counter%ImportCommitBatchSize == 0 { + if err := batch.Write(); err != nil { + panic(err) + } + + batch, err = NewBatch(db.storage, version) + if err != nil { + panic(err) + } + } } - }() - return db.RawImport(rawCh) - - // TODO: Revert. This is just for testing RawImport End to End temporarily - - // var wg sync.WaitGroup - - // worker := func() { - // defer wg.Done() - // batch, err := NewBatch(db.storage, version) - // if err != nil { - // panic(err) - // } - - // var counter int - // for entry := range ch { - // err := batch.Set(entry.StoreKey, entry.Key, entry.Value) - // if err != nil { - // panic(err) - // } - - // counter++ - // if counter%ImportCommitBatchSize == 0 { - // if err := batch.Write(); err != nil { - // panic(err) - // } - - // batch, err = NewBatch(db.storage, version) - // if err != nil { - // panic(err) - // } - // } - // } - - // if batch.Size() > 0 { - // if err := batch.Write(); err != nil { - // panic(err) - // } - // } - // } - - // wg.Add(db.config.ImportNumWorkers) - // for i := 0; i < db.config.ImportNumWorkers; i++ { - // go worker() - // } - - // wg.Wait() - - // return nil + if batch.Size() > 0 { + if err := batch.Write(); err != nil { + panic(err) + } + } + } + + wg.Add(db.config.ImportNumWorkers) + for i := 0; i < db.config.ImportNumWorkers; i++ { + go worker() + } + + wg.Wait() + + return nil } func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { From c25174bb1dc8d75dd7055321323afc8a5356b4e9 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Mon, 15 Jul 2024 09:48:33 -0400 Subject: [PATCH 4/5] Add unit tests for Import and RawImport --- ss/test/storage_test_suite.go | 63 +++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index 55205a9..b87a480 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -1192,3 +1192,66 @@ func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() { close(triggerStartCh) wg.Wait() } + +func (s *StorageTestSuite) TestDatabaseImport() { + db, err := s.NewDB(s.T().TempDir(), s.Config) + s.Require().NoError(err) + defer db.Close() + + ch := make(chan types.SnapshotNode, 10) + go func() { + for i := 0; i < 10; i++ { + ch <- types.SnapshotNode{ + StoreKey: "store1", + Key: []byte(fmt.Sprintf("key%03d", i)), + Value: []byte(fmt.Sprintf("value%03d", i)), + } + } + close(ch) + }() + + s.Require().NoError(db.Import(1, ch)) + + for i := 0; i < 10; i++ { + val, err := db.Get("store1", 1, []byte(fmt.Sprintf("key%03d", i))) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value%03d", i)), val) + } +} + +func (s *StorageTestSuite) TestDatabaseRawImport() { + db, err := s.NewDB(s.T().TempDir(), s.Config) + s.Require().NoError(err) + defer db.Close() + + ch := make(chan types.RawSnapshotNode, 10) + var wg sync.WaitGroup + + // Launch goroutine for each version + for i := 10; i >= 0; i-- { + wg.Add(1) + go func(i int) { + defer wg.Done() // Decrement the counter when the goroutine completes + ch <- types.RawSnapshotNode{ + StoreKey: "store1", + Key: []byte(fmt.Sprintf("key%03d", i)), + Value: []byte(fmt.Sprintf("value%03d", i)), + Version: int64(i + 1), + } + }(i) + } + + go func() { + wg.Wait() + close(ch) + }() + + // Execute the import + s.Require().NoError(db.RawImport(ch)) + + for i := 0; i <= 10; i++ { + val, err := db.Get("store1", int64(i+1), []byte(fmt.Sprintf("key%03d", i))) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value%03d", i)), val) + } +} From da2ab01db357e8872f426169590568b71c1839d8 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Mon, 15 Jul 2024 09:53:00 -0400 Subject: [PATCH 5/5] Update comment --- ss/test/storage_test_suite.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index b87a480..8ce2f7b 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -1227,11 +1227,11 @@ func (s *StorageTestSuite) TestDatabaseRawImport() { ch := make(chan types.RawSnapshotNode, 10) var wg sync.WaitGroup - // Launch goroutine for each version + // Write versions in reverse order intentionally for i := 10; i >= 0; i-- { wg.Add(1) go func(i int) { - defer wg.Done() // Decrement the counter when the goroutine completes + defer wg.Done() ch <- types.RawSnapshotNode{ StoreKey: "store1", Key: []byte(fmt.Sprintf("key%03d", i)), @@ -1246,7 +1246,6 @@ func (s *StorageTestSuite) TestDatabaseRawImport() { close(ch) }() - // Execute the import s.Require().NoError(db.RawImport(ch)) for i := 0; i <= 10; i++ {