Skip to content

Commit

Permalink
Merge pull request #66 from sei-protocol/yzang/SEI-7639
Browse files Browse the repository at this point in the history
Add pruning and WAL capability for SS store
  • Loading branch information
yzang2019 authored Jun 26, 2024
2 parents 50e236f + aaf6f86 commit 498ee09
Show file tree
Hide file tree
Showing 16 changed files with 257 additions and 34 deletions.
4 changes: 2 additions & 2 deletions common/utils/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ func GetStateStorePath(homePath string, backend string) string {
return filepath.Join(homePath, "data", backend)
}

func GetChangelogPath(commitStorePath string) string {
return filepath.Join(commitStorePath, "changelog")
func GetChangelogPath(dbPath string) string {
return filepath.Join(dbPath, "changelog")
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type StateStoreConfig struct {
// default to empty
DBDirectory string `mapstructure:"db-directory"`

// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`

// Backend defines the backend database used for state-store
// Supported backends: pebbledb, rocksdb
// defaults to pebbledb
Expand Down
91 changes: 86 additions & 5 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
errorutils "github.com/sei-protocol/sei-db/common/errors"
"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/common/utils"
"github.com/sei-protocol/sei-db/config"
"github.com/sei-protocol/sei-db/proto"
"github.com/sei-protocol/sei-db/ss/types"
"github.com/sei-protocol/sei-db/stream/changelog"
"golang.org/x/exp/slices"
)

Expand All @@ -40,14 +44,26 @@ var (
)

type Database struct {
storage *pebble.DB
config config.StateStoreConfig
storage *pebble.DB
asyncWriteWG sync.WaitGroup
config config.StateStoreConfig
// Earliest version for db after pruning
earliestVersion int64

// Map of module to when each was last updated
// Used in pruning to skip over stores that have not been updated recently
storeKeyDirty sync.Map

// Changelog used to support async write
streamHandler *changelog.Stream

// Pending changes to be written to the DB
pendingChanges chan VersionedChangesets
}

type VersionedChangesets struct {
Version int64
Changesets []*proto.NamedChangeSet
}

func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
Expand Down Expand Up @@ -93,12 +109,28 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

return &Database{
database := &Database{
storage: db,
asyncWriteWG: sync.WaitGroup{},
config: config,
earliestVersion: earliestVersion,
}, nil
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}
if config.DedicatedChangelog {
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: 300 * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()
}
return database, nil
}

func NewWithDB(storage *pebble.DB) *Database {
Expand All @@ -108,6 +140,13 @@ func NewWithDB(storage *pebble.DB) *Database {
}

func (db *Database) Close() error {
if db.streamHandler != nil {
db.streamHandler.Close()
db.streamHandler = nil
close(db.pendingChanges)
}
// Wait for the async writes to finish
db.asyncWriteWG.Wait()
err := db.storage.Close()
db.storage = nil
return err
Expand Down Expand Up @@ -252,6 +291,48 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
return b.Write()
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
// Write to WAL first
if db.streamHandler != nil {
entry := proto.ChangelogEntry{
Version: version,
}
entry.Changesets = changesets
entry.Upgrades = nil
err := db.streamHandler.WriteNextEntry(entry)
if err != nil {
return err
}
}
// Then write to pending changes
db.pendingChanges <- VersionedChangesets{
Version: version,
Changesets: changesets,
}
return nil
}

func (db *Database) writeAsyncInBackground() {
db.asyncWriteWG.Add(1)
defer db.asyncWriteWG.Done()
for nextChange := range db.pendingChanges {
if db.streamHandler != nil {
version := nextChange.Version
for _, cs := range nextChange.Changesets {
err := db.ApplyChangeset(version, cs)
if err != nil {
panic(err)
}
}
err := db.SetLatestVersion(version)
if err != nil {
panic(err)
}
}
}

}

// Prune attempts to prune all versions up to and including the current version
// Get the range of keys, manually iterate over them and delete them
// We add a heuristic to skip over a module's keys during pruning if it hasn't been updated
Expand Down
4 changes: 2 additions & 2 deletions ss/pebbledb_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (

func init() {
initializer := func(dir string, configs config.StateStoreConfig) (types.StateStore, error) {
dbHome := dir
dbHome := utils.GetStateStorePath(dir, configs.Backend)
if configs.DBDirectory != "" {
dbHome = configs.DBDirectory
}
return pebbledb.New(utils.GetStateStorePath(dbHome, configs.Backend), configs)
return pebbledb.New(dbHome, configs)
}
RegisterBackend(PebbleDBBackend, initializer)
}
4 changes: 4 additions & 0 deletions ss/rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
return b.Write()
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
return fmt.Errorf("not implemented")
}

// Prune attempts to prune all versions up to and including the provided version.
// This is done internally by updating the full_history_ts_low RocksDB value on
// the column families, s.t. all versions less than full_history_ts_low will be
Expand Down
4 changes: 4 additions & 0 deletions ss/sqlite/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
return b.Write()
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
return fmt.Errorf("not implemented")
}

func (db *Database) Prune(version int64) error {
stmt := "DELETE FROM state_storage WHERE version <= ? AND store_key != ?;"

Expand Down
46 changes: 32 additions & 14 deletions ss/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/sei-protocol/sei-db/common/utils"
"github.com/sei-protocol/sei-db/config"
"github.com/sei-protocol/sei-db/proto"
"github.com/sei-protocol/sei-db/ss/pruning"
"github.com/sei-protocol/sei-db/ss/types"
"github.com/sei-protocol/sei-db/stream/changelog"
)
Expand Down Expand Up @@ -34,32 +35,39 @@ func RegisterBackend(backendType BackendType, initializer BackendInitializer) {
}

// NewStateStore Create a new state store with the specified backend type
func NewStateStore(homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
initializer, ok := backends[BackendType(ssConfig.Backend)]
if !ok {
return nil, fmt.Errorf("unsupported backend: %s", ssConfig.Backend)
}
db, err := initializer(homeDir, ssConfig)
stateStore, err := initializer(homeDir, ssConfig)
if err != nil {
return nil, err
}
return db, nil
// Handle auto recovery for DB running with async mode
if ssConfig.DedicatedChangelog {
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
err := RecoverStateStore(logger, changelogPath, stateStore)
if err != nil {
return nil, err
}
}
// Start the pruning manager for DB
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
pruningManager.Start()
return stateStore, nil
}

// RecoverStateStore will be called during initialization to recover the state from rlog
func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.StateStore) error {
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
ssLatestVersion, err := stateStore.GetLatestVersion()
if err != nil {
return err
}
if ssLatestVersion <= 0 {
return nil
}
streamHandler, err := changelog.NewStream(
logger,
utils.GetChangelogPath(utils.GetChangelogPath(utils.GetCommitStorePath(homePath))),
changelog.Config{},
)
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
if err != nil {
return err
}
Expand All @@ -71,16 +79,26 @@ func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.S
if lastOffset <= 0 || errLast != nil {
return err
}
firstEntry, errRead := streamHandler.ReadAt(firstOffset)
lastEntry, errRead := streamHandler.ReadAt(lastOffset)
if errRead != nil {
return err
}
firstVersion := firstEntry.Version
delta := uint64(firstVersion) - firstOffset
targetStartOffset := uint64(ssLatestVersion) - delta
// Look backward to find where we should start replay from
curVersion := lastEntry.Version
curOffset := lastOffset
for curVersion > ssLatestVersion && curOffset > firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
}
curVersion = curEntry.Version
}
// Replay from the offset where the offset where the version is larger than SS store latest version
targetStartOffset := curOffset
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
if targetStartOffset < lastOffset {
return streamHandler.Replay(targetStartOffset+1, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
return streamHandler.Replay(targetStartOffset, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
// commit to state store
for _, cs := range entry.Changesets {
if err := stateStore.ApplyChangeset(entry.Version, cs); err != nil {
Expand Down
58 changes: 58 additions & 0 deletions ss/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ss

import (
"fmt"
"os"
"testing"

"github.com/cosmos/iavl"
"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/config"
"github.com/sei-protocol/sei-db/proto"
"github.com/stretchr/testify/require"
)

func TestNewStateStore(t *testing.T) {
tempDir := os.TempDir()
ssConfig := config.StateStoreConfig{
DedicatedChangelog: true,
Backend: string(PebbleDBBackend),
AsyncWriteBuffer: 10,
KeepRecent: 100,
}
stateStore, err := NewStateStore(logger.NewNopLogger(), tempDir, ssConfig)
require.NoError(t, err)
for i := 1; i < 10; i++ {
var changesets []*proto.NamedChangeSet
kvPair := &iavl.KVPair{
Delete: false,
Key: []byte(fmt.Sprintf("key%d", i)),
Value: []byte(fmt.Sprintf("value%d", i)),
}
var pairs []*iavl.KVPair
pairs = append(pairs, kvPair)
cs := iavl.ChangeSet{Pairs: pairs}
ncs := &proto.NamedChangeSet{
Name: "storeA",
Changeset: cs,
}
changesets = append(changesets, ncs)
err := stateStore.ApplyChangesetAsync(int64(i), changesets)
require.NoError(t, err)
}
// Closing the state store without waiting for data to be fully flushed
err = stateStore.Close()
require.NoError(t, err)

// Reopen a new state store
stateStore, err = NewStateStore(logger.NewNopLogger(), tempDir, ssConfig)
require.NoError(t, err)

// Make sure key and values can be found
for i := 1; i < 10; i++ {
value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i)))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("value%d", i), string(value))
}

}
3 changes: 3 additions & 0 deletions ss/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type StateStore interface {
// the version should be latest version plus one.
ApplyChangeset(version int64, cs *proto.NamedChangeSet) error

// ApplyChangesetAsync Write changesets into WAL file first and apply later for async writes
ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error

// Import the initial state of the store
Import(version int64, ch <-chan SnapshotNode) error

Expand Down
Loading

0 comments on commit 498ee09

Please sign in to comment.