diff --git a/ss/pebbledb/batch.go b/ss/pebbledb/batch.go index 12e6a22..d8dc003 100644 --- a/ss/pebbledb/batch.go +++ b/ss/pebbledb/batch.go @@ -65,3 +65,53 @@ func (b *Batch) Write() (err error) { return b.batch.Commit(defaultWriteOpts) } + +// For writing kv pairs in any order of version +type RawBatch struct { + storage *pebble.DB + batch *pebble.Batch +} + +func NewRawBatch(storage *pebble.DB) (*RawBatch, error) { + batch := storage.NewBatch() + + return &RawBatch{ + storage: storage, + batch: batch, + }, nil +} + +func (b *RawBatch) Size() int { + return b.batch.Len() +} + +func (b *RawBatch) Reset() { + b.batch.Reset() +} + +func (b *RawBatch) set(storeKey string, tombstone int64, key, value []byte, version int64) error { + prefixedKey := MVCCEncode(prependStoreKey(storeKey, key), version) + prefixedVal := MVCCEncode(value, tombstone) + + if err := b.batch.Set(prefixedKey, prefixedVal, nil); err != nil { + return fmt.Errorf("failed to write PebbleDB batch: %w", err) + } + + return nil +} + +func (b *RawBatch) Set(storeKey string, key, value []byte, version int64) error { + return b.set(storeKey, 0, key, value, version) +} + +func (b *RawBatch) Delete(storeKey string, key []byte, version int64) error { + return b.set(storeKey, version, key, []byte(tombstoneVal), version) +} + +func (b *RawBatch) Write() (err error) { + defer func() { + err = errors.Join(err, b.batch.Close()) + }() + + return b.batch.Commit(defaultWriteOpts) +} diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 76d099c..6f6ffe0 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -542,6 +542,53 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { return nil } +func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { + var wg sync.WaitGroup + + worker := func() { + defer wg.Done() + batch, err := NewRawBatch(db.storage) + if err != nil { + panic(err) + } + + var counter int + for entry := range ch { + err := batch.Set(entry.StoreKey, entry.Key, entry.Value, entry.Version) + if err != nil { + panic(err) + } + + counter++ + if counter%ImportCommitBatchSize == 0 { + if err := batch.Write(); err != nil { + panic(err) + } + + batch, err = NewRawBatch(db.storage) + if err != nil { + panic(err) + } + } + } + + if batch.Size() > 0 { + if err := batch.Write(); err != nil { + panic(err) + } + } + } + + wg.Add(db.config.ImportNumWorkers) + for i := 0; i < db.config.ImportNumWorkers; i++ { + go worker() + } + + wg.Wait() + + return nil +} + // RawIterate iterates over all keys and values for a store func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error) { // Iterate through all keys and values for a store diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index 55205a9..8ce2f7b 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -1192,3 +1192,65 @@ func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() { close(triggerStartCh) wg.Wait() } + +func (s *StorageTestSuite) TestDatabaseImport() { + db, err := s.NewDB(s.T().TempDir(), s.Config) + s.Require().NoError(err) + defer db.Close() + + ch := make(chan types.SnapshotNode, 10) + go func() { + for i := 0; i < 10; i++ { + ch <- types.SnapshotNode{ + StoreKey: "store1", + Key: []byte(fmt.Sprintf("key%03d", i)), + Value: []byte(fmt.Sprintf("value%03d", i)), + } + } + close(ch) + }() + + s.Require().NoError(db.Import(1, ch)) + + for i := 0; i < 10; i++ { + val, err := db.Get("store1", 1, []byte(fmt.Sprintf("key%03d", i))) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value%03d", i)), val) + } +} + +func (s *StorageTestSuite) TestDatabaseRawImport() { + db, err := s.NewDB(s.T().TempDir(), s.Config) + s.Require().NoError(err) + defer db.Close() + + ch := make(chan types.RawSnapshotNode, 10) + var wg sync.WaitGroup + + // Write versions in reverse order intentionally + for i := 10; i >= 0; i-- { + wg.Add(1) + go func(i int) { + defer wg.Done() + ch <- types.RawSnapshotNode{ + StoreKey: "store1", + Key: []byte(fmt.Sprintf("key%03d", i)), + Value: []byte(fmt.Sprintf("value%03d", i)), + Version: int64(i + 1), + } + }(i) + } + + go func() { + wg.Wait() + close(ch) + }() + + s.Require().NoError(db.RawImport(ch)) + + for i := 0; i <= 10; i++ { + val, err := db.Get("store1", int64(i+1), []byte(fmt.Sprintf("key%03d", i))) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value%03d", i)), val) + } +} diff --git a/ss/types/store.go b/ss/types/store.go index 171fcfb..4a580f9 100644 --- a/ss/types/store.go +++ b/ss/types/store.go @@ -30,6 +30,9 @@ type StateStore interface { // Import the initial state of the store Import(version int64, ch <-chan SnapshotNode) error + // Import the kv entries into the store in any order of version + RawImport(ch <-chan RawSnapshotNode) error + // Prune attempts to prune all versions up to and including the provided // version argument. The operation should be idempotent. An error should be // returned upon failure. @@ -73,3 +76,19 @@ type SnapshotNode struct { Key []byte Value []byte } + +type RawSnapshotNode struct { + StoreKey string + Key []byte + Value []byte + Version int64 +} + +func GetRawSnapshotNode(node SnapshotNode, version int64) RawSnapshotNode { + return RawSnapshotNode{ + StoreKey: node.StoreKey, + Key: node.Key, + Value: node.Value, + Version: version, + } +}