Skip to content

Commit

Permalink
Add pruning and WAL capability for SS store
Browse files Browse the repository at this point in the history
  • Loading branch information
yzang2019 committed Jun 21, 2024
1 parent 2f1baf4 commit 220d9d5
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 20 deletions.
4 changes: 2 additions & 2 deletions common/utils/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ func GetStateStorePath(homePath string, backend string) string {
return filepath.Join(homePath, "data", backend)
}

func GetChangelogPath(commitStorePath string) string {
return filepath.Join(commitStorePath, "changelog")
func GetChangelogPath(dbPath string) string {
return filepath.Join(dbPath, "changelog")
}
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ type StateStoreConfig struct {
// DBDirectory defines the directory to store the state store db files
// If not explicitly set, default to application home directory
// default to empty
DBDirectory string `mapstructure:"db-directory"`
DBDirectory string `mapstructure:"db-dir"`

// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`

// Backend defines the backend database used for state-store
// Supported backends: pebbledb, rocksdb
Expand Down
70 changes: 67 additions & 3 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/common/utils"
"github.com/sei-protocol/sei-db/stream/changelog"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -48,6 +51,17 @@ type Database struct {
// Map of module to when each was last updated
// Used in pruning to skip over stores that have not been updated recently
storeKeyDirty sync.Map

// Changelog used to support async write
streamHandler *changelog.Stream

// Pending changes to be written to the DB
pendingChanges chan VersionedChangesets
}

type VersionedChangesets struct {
Version int64
Changesets []*proto.NamedChangeSet
}

func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
Expand Down Expand Up @@ -93,12 +107,22 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

return &Database{
database := &Database{
storage: db,
config: config,
earliestVersion: earliestVersion,
}, nil
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}
if config.DedicatedChangelog {
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{DisableFsync: true, ZeroCopy: true},
)
database.streamHandler = streamHandler
go database.writeAsync()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}
return database, nil
}

func NewWithDB(storage *pebble.DB) *Database {
Expand All @@ -110,6 +134,11 @@ func NewWithDB(storage *pebble.DB) *Database {
func (db *Database) Close() error {
err := db.storage.Close()
db.storage = nil
if db.streamHandler != nil {
db.streamHandler.Close()
db.streamHandler = nil
close(db.pendingChanges)
}
return err
}

Expand Down Expand Up @@ -252,6 +281,41 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
return b.Write()
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
// Write to WAL first
if db.streamHandler != nil {
entry := proto.ChangelogEntry{
Version: version,
}
entry.Changesets = changesets
err := db.streamHandler.WriteNextEntry(entry)
if err != nil {
return err
}
}
// Then write to pending changes
db.pendingChanges <- VersionedChangesets{
Version: version,
Changesets: changesets,
}
return nil
}

func (db *Database) writeAsync() {
for db.streamHandler != nil {
for nextChange := range db.pendingChanges {
version := nextChange.Version
for _, cs := range nextChange.Changesets {
err := db.ApplyChangeset(version, cs)
if err != nil {
panic(panic)

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / lint

panic (built-in) must be called (typecheck)

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / lint

panic (built-in) must be called) (typecheck)

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / Analyze

panic (built-in) must be called

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / tests

panic (built-in) must be called
}
}
db.SetLatestVersion(version)
}
}
}

// Prune attempts to prune all versions up to and including the current version
// Get the range of keys, manually iterate over them and delete them
// We add a heuristic to skip over a module's keys during pruning if it hasn't been updated
Expand Down
45 changes: 31 additions & 14 deletions ss/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ss

import (
"fmt"
"github.com/sei-protocol/sei-db/ss/pruning"

"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/common/utils"
Expand Down Expand Up @@ -34,32 +35,39 @@ func RegisterBackend(backendType BackendType, initializer BackendInitializer) {
}

// NewStateStore Create a new state store with the specified backend type
func NewStateStore(homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
initializer, ok := backends[BackendType(ssConfig.Backend)]
if !ok {
return nil, fmt.Errorf("unsupported backend: %s", ssConfig.Backend)
}
db, err := initializer(homeDir, ssConfig)
stateStore, err := initializer(homeDir, ssConfig)
if err != nil {
return nil, err
}
return db, nil
// Handle auto recovery for DB running with async mode
if ssConfig.DedicatedChangelog {
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
err := RecoverStateStore(logger, changelogPath, stateStore)
if err != nil {
return nil, err
}
}
// Start the pruning manager for DB
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
pruningManager.Start()
return stateStore, nil
}

// RecoverStateStore will be called during initialization to recover the state from rlog
func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.StateStore) error {
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
ssLatestVersion, err := stateStore.GetLatestVersion()
if err != nil {
return err
}
if ssLatestVersion <= 0 {
return nil
}
streamHandler, err := changelog.NewStream(
logger,
utils.GetChangelogPath(utils.GetChangelogPath(utils.GetCommitStorePath(homePath))),
changelog.Config{},
)
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
if err != nil {
return err
}
Expand All @@ -71,16 +79,25 @@ func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.S
if lastOffset <= 0 || errLast != nil {
return err
}
firstEntry, errRead := streamHandler.ReadAt(firstOffset)
lastEntry, errRead := streamHandler.ReadAt(lastOffset)
if errRead != nil {
return err
}
firstVersion := firstEntry.Version
delta := uint64(firstVersion) - firstOffset
targetStartOffset := uint64(ssLatestVersion) - delta
// Look backward to find where we should start replay from
curVersion := lastEntry.Version
curOffset := lastOffset
for curVersion > ssLatestVersion && curOffset >= firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
}
curVersion = curEntry.Version
}
targetStartOffset := curOffset
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
if targetStartOffset < lastOffset {
return streamHandler.Replay(targetStartOffset+1, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
return streamHandler.Replay(targetStartOffset, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
// commit to state store
for _, cs := range entry.Changesets {
if err := stateStore.ApplyChangeset(entry.Version, cs); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions ss/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type StateStore interface {
// the version should be latest version plus one.
ApplyChangeset(version int64, cs *proto.NamedChangeSet) error

// ApplyChangesetAsync Write changesets into WAL file first and apply later for async writes
ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error

// Import the initial state of the store
Import(version int64, ch <-chan SnapshotNode) error

Expand Down
28 changes: 28 additions & 0 deletions stream/changelog/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Stream struct {
logger logger.Logger
writeChannel chan *Message
errSignal chan error
nextOffset uint64
}

type Message struct {
Expand All @@ -32,6 +33,7 @@ type Config struct {
DisableFsync bool
ZeroCopy bool
WriteBufferSize int
KeepLast int
}

// NewStream creates a new changelog stream that persist the changesets in the log
Expand All @@ -42,6 +44,15 @@ func NewStream(logger logger.Logger, dir string, config Config) (*Stream, error)
})
if err != nil {
return nil, err
}
firstEntry, err := log.FirstIndex()
if err != nil {
return nil, err
}
if firstEntry <= 0 {

Check failure on line 52 in stream/changelog/changelog.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)
}
if config.KeepLast > 0 {

Check failure on line 54 in stream/changelog/changelog.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)

}
return &Stream{
log: log,
Expand Down Expand Up @@ -75,6 +86,18 @@ func (stream *Stream) Write(offset uint64, entry proto.ChangelogEntry) error {
return nil
}

// WriteNextEntry will write a new entry to the last index of the log.
// Whether the writes is in blocking or async manner depends on the buffer size.
func (stream *Stream) WriteNextEntry(entry proto.ChangelogEntry) error {
nextOffset := stream.nextOffset
err := stream.Write(nextOffset, entry)
if err != nil {
return err
}
stream.nextOffset++
return nil
}

// startWriteGoroutine will start a goroutine to write entries to the log.
// This should only be called on initialization if async write is enabled
func (stream *Stream) startWriteGoroutine() {
Expand Down Expand Up @@ -172,6 +195,11 @@ func (stream *Stream) Replay(start uint64, end uint64, processFn func(index uint
return nil
}

//

Check failure on line 198 in stream/changelog/changelog.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
func (stream *Stream) Pruning() {

}

func (stream *Stream) Close() error {
if stream.writeChannel == nil {
return nil
Expand Down

0 comments on commit 220d9d5

Please sign in to comment.