From 07ddda6293b9258100fa8bed89d0571ed27ff226 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Thu, 21 Nov 2024 11:40:09 -0500 Subject: [PATCH 1/2] WIP: Version Sharding --- config/config.go | 3 + ss/pebbledb/db.go | 321 +++++++++++++++++++++++++++++++++------------- 2 files changed, 233 insertions(+), 91 deletions(-) diff --git a/config/config.go b/config/config.go index f11b0ab..cfdaf9c 100644 --- a/config/config.go +++ b/config/config.go @@ -87,6 +87,9 @@ type StateStoreConfig struct { // Whether to keep last version of a key during pruning or delete // defaults to true KeepLastVersion bool `mapstructure:"keep-last-version"` + + // Number of versions per shard + VersionShardSize int64 `mapstructure:"version-shard-size"` } func DefaultStateCommitConfig() StateCommitConfig { diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index cac2296..eb6b750 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "math" + "os" + "path/filepath" "strings" "sync" "time" @@ -46,8 +48,14 @@ var ( defaultWriteOpts = pebble.NoSync ) +type VersionedDB struct { + startVersion int64 + endVersion int64 // exclusive + db *pebble.DB +} + type Database struct { - storage *pebble.DB + dbs []*VersionedDB asyncWriteWG sync.WaitGroup config config.StateStoreConfig // Earliest version for db after pruning @@ -70,8 +78,51 @@ type VersionedChangesets struct { } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { + versionShardSize := config.VersionShardSize + database := &Database{ + dbs: []*VersionedDB{}, + config: config, + pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), + } + + // Initialize the first shard + startVersion := int64(0) + endVersion := startVersion + versionShardSize + versionedDB, err := openVersionedDB(dataDir, startVersion, endVersion, database.getPebbleOptions()) + if err != nil { + return nil, err + } + + database.dbs = append(database.dbs, versionedDB) + + earliestVersion, err := retrieveEarliestVersion(versionedDB.db) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + database.earliestVersion = earliestVersion + + if config.DedicatedChangelog { + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: 300 * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() + } + + return database, nil +} + +func (db *Database) getPebbleOptions() *pebble.Options { cache := pebble.NewCache(1024 * 1024 * 32) - defer cache.Unref() + // We should not defer cache.Unref() here because we need the cache to persist + // for the lifetime of the database opts := &pebble.Options{ Cache: cache, Comparer: MVCCComparer, @@ -91,7 +142,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { l.IndexBlockSize = 256 << 10 // 256 KB l.FilterPolicy = bloom.FilterPolicy(10) l.FilterType = pebble.TableFilter - // TODO: Consider compression only for specific layers like bottommost l.Compression = pebble.ZstdCompression if i > 0 { l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2 @@ -103,43 +153,23 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { opts.FlushSplitBytes = opts.Levels[0].TargetFileSize opts = opts.EnsureDefaults() - db, err := pebble.Open(dataDir, opts) - if err != nil { - return nil, fmt.Errorf("failed to open PebbleDB: %w", err) - } + return opts +} - earliestVersion, err := retrieveEarliestVersion(db) +func openVersionedDB(dataDir string, startVersion, endVersion int64, opts *pebble.Options) (*VersionedDB, error) { + dbDir := filepath.Join(dataDir, fmt.Sprintf("%d_%d", startVersion, endVersion)) + db, err := pebble.Open(dbDir, opts) if err != nil { return nil, fmt.Errorf("failed to open PebbleDB: %w", err) } - database := &Database{ - storage: db, - asyncWriteWG: sync.WaitGroup{}, - config: config, - earliestVersion: earliestVersion, - pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), - } - if config.DedicatedChangelog { - streamHandler, _ := changelog.NewStream( - logger.NewNopLogger(), - utils.GetChangelogPath(dataDir), - changelog.Config{ - DisableFsync: true, - ZeroCopy: true, - KeepRecent: uint64(config.KeepRecent), - PruneInterval: 300 * time.Second, - }, - ) - database.streamHandler = streamHandler - go database.writeAsyncInBackground() - } - return database, nil -} -func NewWithDB(storage *pebble.DB) *Database { - return &Database{ - storage: storage, + versionedDB := &VersionedDB{ + startVersion: startVersion, + endVersion: endVersion, + db: db, } + + return versionedDB, nil } func (db *Database) Close() error { @@ -148,46 +178,58 @@ func (db *Database) Close() error { db.streamHandler = nil close(db.pendingChanges) } - // Wait for the async writes to finish db.asyncWriteWG.Wait() - err := db.storage.Close() - db.storage = nil - return err + + for _, vdb := range db.dbs { + err := vdb.db.Close() + if err != nil { + return err + } + } + db.dbs = nil + return nil } func (db *Database) SetLatestVersion(version int64) error { + currentDB, err := db.getOrCreateDBForVersion(version) + if err != nil { + return err + } var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) + err = currentDB.db.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) fmt.Printf("SetLatestVersion: version=%d, err=%v, latestVersionKey=%s\n", version, err, latestVersionKey) return err } func (db *Database) GetLatestVersion() (int64, error) { - bz, closer, err := db.storage.Get([]byte(latestVersionKey)) + if len(db.dbs) == 0 { + return 0, nil + } + currentDB := db.dbs[len(db.dbs)-1] + bz, closer, err := currentDB.db.Get([]byte(latestVersionKey)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database return 0, nil } - return 0, err } + defer closer.Close() if len(bz) == 0 { - return 0, closer.Close() + return 0, nil } - return int64(binary.LittleEndian.Uint64(bz)), closer.Close() + return int64(binary.LittleEndian.Uint64(bz)), nil } func (db *Database) SetEarliestVersion(version int64) error { if version > db.earliestVersion { db.earliestVersion = version - + currentDB := db.dbs[0] var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) + return currentDB.db.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) } return nil } @@ -215,14 +257,16 @@ func retrieveEarliestVersion(db *pebble.DB) (int64, error) { return int64(binary.LittleEndian.Uint64(bz)), closer.Close() } -// SetLatestKey sets the latest key processed during migration. +// SetLatestMigratedKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { - return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) + currentDB := db.dbs[len(db.dbs)-1] + return currentDB.db.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) } -// GetLatestKey retrieves the latest key processed during migration. +// GetLatestMigratedKey retrieves the latest key processed during migration. func (db *Database) GetLatestMigratedKey() ([]byte, error) { - bz, closer, err := db.storage.Get([]byte(latestMigratedKeyMetadata)) + currentDB := db.dbs[len(db.dbs)-1] + bz, closer, err := currentDB.db.Get([]byte(latestMigratedKeyMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, nil @@ -230,17 +274,19 @@ func (db *Database) GetLatestMigratedKey() ([]byte, error) { return nil, err } defer closer.Close() - return bz, nil + return slices.Clone(bz), nil } -// SetLatestModule sets the latest module processed during migration. +// SetLatestMigratedModule sets the latest module processed during migration. func (db *Database) SetLatestMigratedModule(module string) error { - return db.storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) + currentDB := db.dbs[len(db.dbs)-1] + return currentDB.db.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) } -// GetLatestModule retrieves the latest module processed during migration. +// GetLatestMigratedModule retrieves the latest module processed during migration. func (db *Database) GetLatestMigratedModule() (string, error) { - bz, closer, err := db.storage.Get([]byte(latestMigratedModuleMetadata)) + currentDB := db.dbs[len(db.dbs)-1] + bz, closer, err := currentDB.db.Get([]byte(latestMigratedModuleMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return "", nil @@ -269,38 +315,47 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt return nil, nil } - prefixedVal, err := getMVCCSlice(db.storage, storeKey, key, targetVersion) - if err != nil { - if errors.Is(err, errorutils.ErrRecordNotFound) { - return nil, nil + for i := len(db.dbs) - 1; i >= 0; i-- { + vdb := db.dbs[i] + if targetVersion < vdb.startVersion { + continue } - return nil, fmt.Errorf("failed to perform PebbleDB read: %w", err) - } + if targetVersion >= vdb.endVersion { + // Target version not in this shard + continue + } - valBz, tombBz, ok := SplitMVCCKey(prefixedVal) - if !ok { - return nil, fmt.Errorf("invalid PebbleDB MVCC value: %s", prefixedVal) - } + prefixedVal, err := getMVCCSlice(vdb.db, storeKey, key, targetVersion) + if err != nil { + if errors.Is(err, errorutils.ErrRecordNotFound) { + continue + } - // A tombstone of zero or a target version that is less than the tombstone - // version means the key is not deleted at the target version. - if len(tombBz) == 0 { - return valBz, nil - } + return nil, fmt.Errorf("failed to perform PebbleDB read: %w", err) + } - tombstone, err := decodeUint64Ascending(tombBz) - if err != nil { - return nil, fmt.Errorf("failed to decode value tombstone: %w", err) - } + valBz, tombBz, ok := SplitMVCCKey(prefixedVal) + if !ok { + return nil, fmt.Errorf("invalid PebbleDB MVCC value: %s", prefixedVal) + } + + if len(tombBz) == 0 { + return valBz, nil + } - // A tombstone of zero or a target version that is less than the tombstone - // version means the key is not deleted at the target version. - if targetVersion < tombstone { - return valBz, nil + tombstone, err := decodeUint64Ascending(tombBz) + if err != nil { + return nil, fmt.Errorf("failed to decode value tombstone: %w", err) + } + + if targetVersion < tombstone { + return valBz, nil + } + + return nil, nil } - // the value is considered deleted return nil, nil } @@ -312,7 +367,12 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro version = 1 } - b, err := NewBatch(db.storage, version) + currentDB, err := db.getOrCreateDBForVersion(version) + if err != nil { + return err + } + + b, err := NewBatch(currentDB.db, version) if err != nil { return err } @@ -385,15 +445,51 @@ func (db *Database) writeAsyncInBackground() { // it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. // This is not a large issue given the next time that module is updated, it will be properly pruned thereafter. func (db *Database) Prune(version int64) error { - earliestVersion := version + 1 // we increment by 1 to include the provided version + earliestVersion := version + 1 + + if earliestVersion > db.earliestVersion { + db.earliestVersion = earliestVersion + } + + var newDBs []*VersionedDB + for _, vdb := range db.dbs { + if vdb.endVersion <= earliestVersion { + // Close and delete the entire shard + err := vdb.db.Close() + if err != nil { + return err + } + dbDir := filepath.Join(db.dataDir, fmt.Sprintf("%d_%d", vdb.startVersion, vdb.endVersion)) + err = os.RemoveAll(dbDir) + if err != nil { + return err + } + continue + } else if vdb.startVersion >= earliestVersion { + newDBs = append(newDBs, vdb) + } else { + // Partial overlap, prune keys in this shard + err := pruneVersionedDB(db, vdb, earliestVersion) + if err != nil { + return err + } + newDBs = append(newDBs, vdb) + } + } - itr, err := db.storage.NewIter(nil) + db.dbs = newDBs + + return nil +} + +func pruneVersionedDB(db *Database, vdb *VersionedDB, earliestVersion int64) error { + itr, err := vdb.db.NewIter(nil) if err != nil { return err } defer itr.Close() - batch := db.storage.NewBatch() + batch := vdb.db.NewBatch() defer batch.Close() var ( @@ -420,7 +516,7 @@ func (db *Database) Prune(version int64) error { storeKey, err := parseStoreKey(currKey) if err != nil { - // XXX: This should never happen given we skip the metadata keys. + // This should never happen given we skip the metadata keys. return err } @@ -443,7 +539,7 @@ func (db *Database) Prune(version int64) error { // Seek to next key if we are at a version which is higher than prune height // Do not seek to next key if KeepLastVersion is false and we need to delete the previous key in pruning - if currVersionDecoded > version && (db.config.KeepLastVersion || prevVersionDecoded > version) { + if currVersionDecoded > earliestVersion && (db.config.KeepLastVersion || prevVersionDecoded > earliestVersion) { itr.NextPrefix() continue } @@ -451,7 +547,7 @@ func (db *Database) Prune(version int64) error { // Delete a key if another entry for that key exists at a larger version than original but leq to the prune height // Also delete a key if it has been tombstoned and its version is leq to the prune height // Also delete a key if KeepLastVersion is false and version is leq to the prune height - if prevVersionDecoded <= version && (bytes.Equal(prevKey, currKey) || valTombstoned(prevValEncoded) || !db.config.KeepLastVersion) { + if prevVersionDecoded <= earliestVersion && (bytes.Equal(prevKey, currKey) || valTombstoned(prevValEncoded) || !db.config.KeepLastVersion) { err = batch.Delete(prevKeyEncoded, nil) if err != nil { return err @@ -486,7 +582,7 @@ func (db *Database) Prune(version int64) error { } } - return db.SetEarliestVersion(earliestVersion) + return nil } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { @@ -498,6 +594,11 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) return nil, errorutils.ErrStartAfterEnd } + vdb, err := db.getDBForVersion(version) + if err != nil { + return nil, err + } + lowerBound := MVCCEncode(prependStoreKey(storeKey, start), 0) var upperBound []byte @@ -505,7 +606,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0) } - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) + itr, err := vdb.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) if err != nil { return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -522,6 +623,11 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ return nil, errorutils.ErrStartAfterEnd } + vdb, err := db.getDBForVersion(version) + if err != nil { + return nil, err + } + lowerBound := MVCCEncode(prependStoreKey(storeKey, start), 0) var upperBound []byte @@ -529,7 +635,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0) } - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) + itr, err := vdb.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) if err != nil { return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -544,7 +650,12 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { worker := func() { defer wg.Done() - batch, err := NewBatch(db.storage, version) + currentDB, err := db.getOrCreateDBForVersion(version) + if err != nil { + panic(err) + } + + batch, err := NewBatch(currentDB.db, version) if err != nil { panic(err) } @@ -562,7 +673,7 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { panic(err) } - batch, err = NewBatch(db.storage, version) + batch, err = NewBatch(currentDB.db, version) if err != nil { panic(err) } @@ -586,6 +697,34 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { return nil } +func (db *Database) getDBForVersion(version int64) (*VersionedDB, error) { + for _, vdb := range db.dbs { + if version >= vdb.startVersion && version < vdb.endVersion { + return vdb, nil + } + } + return nil, fmt.Errorf("no database shard found for version %d", version) +} + +func (db *Database) getOrCreateDBForVersion(version int64) (*VersionedDB, error) { + vdb, err := db.getDBForVersion(version) + if err == nil { + return vdb, nil + } + // Need to create a new shard + startVersion := (version / db.config.VersionShardSize) * db.config.VersionShardSize + endVersion := startVersion + db.config.VersionShardSize + opts := db.getPebbleOptions() + newDB, err := openVersionedDB(db.config.DBDirectory, startVersion, endVersion, opts) + if err != nil { + return nil, err + } + db.dbs = append(db.dbs, newDB) + return newDB, nil +} + +// TODO: Raw import update with separate db per version +// Can't create a batch with multiple versions func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { var wg sync.WaitGroup From f9f61baba0abc8551e3455554a654d126f747bf7 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Thu, 21 Nov 2024 14:45:44 -0500 Subject: [PATCH 2/2] Revert pruning changes --- ss/pebbledb/db.go | 72 ++++++++++++++++------------------------------- 1 file changed, 24 insertions(+), 48 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index eb6b750..6a8a904 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "math" - "os" "path/filepath" "strings" "sync" @@ -315,6 +314,7 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt return nil, nil } + // Index into it for i := len(db.dbs) - 1; i >= 0; i-- { vdb := db.dbs[i] if targetVersion < vdb.startVersion { @@ -445,51 +445,21 @@ func (db *Database) writeAsyncInBackground() { // it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. // This is not a large issue given the next time that module is updated, it will be properly pruned thereafter. func (db *Database) Prune(version int64) error { - earliestVersion := version + 1 - - if earliestVersion > db.earliestVersion { - db.earliestVersion = earliestVersion + if len(db.dbs) != 0 { + return fmt.Errorf("Pruning not enabled when sharding by version") } - var newDBs []*VersionedDB - for _, vdb := range db.dbs { - if vdb.endVersion <= earliestVersion { - // Close and delete the entire shard - err := vdb.db.Close() - if err != nil { - return err - } - dbDir := filepath.Join(db.dataDir, fmt.Sprintf("%d_%d", vdb.startVersion, vdb.endVersion)) - err = os.RemoveAll(dbDir) - if err != nil { - return err - } - continue - } else if vdb.startVersion >= earliestVersion { - newDBs = append(newDBs, vdb) - } else { - // Partial overlap, prune keys in this shard - err := pruneVersionedDB(db, vdb, earliestVersion) - if err != nil { - return err - } - newDBs = append(newDBs, vdb) - } - } - - db.dbs = newDBs - - return nil -} + // Only one shard when pruning enabled + database := db.dbs[0].db + earliestVersion := version + 1 // we increment by 1 to include the provided version -func pruneVersionedDB(db *Database, vdb *VersionedDB, earliestVersion int64) error { - itr, err := vdb.db.NewIter(nil) + itr, err := database.NewIter(nil) if err != nil { return err } defer itr.Close() - batch := vdb.db.NewBatch() + batch := database.NewBatch() defer batch.Close() var ( @@ -516,7 +486,7 @@ func pruneVersionedDB(db *Database, vdb *VersionedDB, earliestVersion int64) err storeKey, err := parseStoreKey(currKey) if err != nil { - // This should never happen given we skip the metadata keys. + // XXX: This should never happen given we skip the metadata keys. return err } @@ -539,7 +509,7 @@ func pruneVersionedDB(db *Database, vdb *VersionedDB, earliestVersion int64) err // Seek to next key if we are at a version which is higher than prune height // Do not seek to next key if KeepLastVersion is false and we need to delete the previous key in pruning - if currVersionDecoded > earliestVersion && (db.config.KeepLastVersion || prevVersionDecoded > earliestVersion) { + if currVersionDecoded > version && (db.config.KeepLastVersion || prevVersionDecoded > version) { itr.NextPrefix() continue } @@ -547,7 +517,7 @@ func pruneVersionedDB(db *Database, vdb *VersionedDB, earliestVersion int64) err // Delete a key if another entry for that key exists at a larger version than original but leq to the prune height // Also delete a key if it has been tombstoned and its version is leq to the prune height // Also delete a key if KeepLastVersion is false and version is leq to the prune height - if prevVersionDecoded <= earliestVersion && (bytes.Equal(prevKey, currKey) || valTombstoned(prevValEncoded) || !db.config.KeepLastVersion) { + if prevVersionDecoded <= version && (bytes.Equal(prevKey, currKey) || valTombstoned(prevValEncoded) || !db.config.KeepLastVersion) { err = batch.Delete(prevKeyEncoded, nil) if err != nil { return err @@ -582,7 +552,7 @@ func pruneVersionedDB(db *Database, vdb *VersionedDB, earliestVersion int64) err } } - return nil + return db.SetEarliestVersion(earliestVersion) } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { @@ -606,7 +576,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0) } - itr, err := vdb.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) + itr, err := vdb.db.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) if err != nil { return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -698,12 +668,17 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { } func (db *Database) getDBForVersion(version int64) (*VersionedDB, error) { - for _, vdb := range db.dbs { - if version >= vdb.startVersion && version < vdb.endVersion { - return vdb, nil - } + if len(db.dbs) == 0 { + return nil, fmt.Errorf("no database shards available") + } + if version < db.dbs[0].startVersion || version >= db.dbs[len(db.dbs)-1].endVersion { + return nil, fmt.Errorf("version %d is out of bounds", version) + } + index := int((version - db.dbs[0].startVersion) / db.config.VersionShardSize) + if index < 0 || index >= len(db.dbs) { + return nil, fmt.Errorf("no database shard found for version %d", version) } - return nil, fmt.Errorf("no database shard found for version %d", version) + return db.dbs[index], nil } func (db *Database) getOrCreateDBForVersion(version int64) (*VersionedDB, error) { @@ -725,6 +700,7 @@ func (db *Database) getOrCreateDBForVersion(version int64) (*VersionedDB, error) // TODO: Raw import update with separate db per version // Can't create a batch with multiple versions +// Create wrapper around func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { var wg sync.WaitGroup