From e64b20b265c3a3cebefb26a1c57ce26347c8960f Mon Sep 17 00:00:00 2001 From: spongeboi <66492212+Shubhaankar-Sharma@users.noreply.github.com> Date: Fri, 1 Nov 2024 03:26:31 -0700 Subject: [PATCH] initial ethwal filter index (#9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wip: initial ethwal filter index * wip: renaming, refactoring + reader improvements * complete reader * Small refactor (#11) * remove indexes * Add IndexBuilder * Add ZSTD compression to index files * Add chainlens-api * fix: not updating block on peek * new index function signature + magic number * index roll policy * init tests * cleanup * [wip] last block indexed * index only if block is above last indexed * add method to get lowest indexed block * lowest indexed block tests * more tests * refactor * refactor: move fs to index * refactor: small changes + unit tests * fix: wrong renames * fix: gitignore * fix: rename again * fix: comment * refactor: again * refactor: move context to Eval --------- Co-authored-by: Marcin Górzyński --- .gitignore | 2 +- filter.go | 167 +++++++++++++++ filter_test.go | 405 ++++++++++++++++++++++++++++++++++++ go.mod | 19 +- go.sum | 50 ++++- index.go | 272 ++++++++++++++++++++++++ index_file.go | 61 ++++++ indexer.go | 152 ++++++++++++++ reader_with_filter.go | 99 +++++++++ reader_with_filter_test.go | 123 +++++++++++ storage/storage.go | 2 + writer.go | 16 ++ writer_file_roll_policy.go | 43 ++++ writer_no_gap.go | 18 +- writer_with_indexer.go | 88 ++++++++ writer_with_indexer_test.go | 59 ++++++ 16 files changed, 1562 insertions(+), 14 deletions(-) create mode 100644 filter.go create mode 100644 filter_test.go create mode 100644 index.go create mode 100644 index_file.go create mode 100644 indexer.go create mode 100644 reader_with_filter.go create mode 100644 reader_with_filter_test.go create mode 100644 writer_with_indexer.go create mode 100644 writer_with_indexer_test.go diff --git a/.gitignore b/.gitignore index 86d11cc..9f0695d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ ethwalcat ethwalinfo -ethwalcp \ No newline at end of file +ethwalcp diff --git a/filter.go b/filter.go new file mode 100644 index 0000000..e755e1c --- /dev/null +++ b/filter.go @@ -0,0 +1,167 @@ +package ethwal + +import ( + "cmp" + "context" + "fmt" + "path" + + "github.com/0xsequence/ethwal/storage" + "github.com/0xsequence/ethwal/storage/local" + "github.com/RoaringBitmap/roaring/v2/roaring64" +) + +type Filter interface { + Eval(ctx context.Context) FilterIterator +} + +type FilterIterator interface { + HasNext() bool + Next() (uint64, uint16) + Peek() (uint64, uint16) + Bitmap() *roaring64.Bitmap +} + +type FilterBuilder interface { + And(filters ...Filter) Filter + Or(filters ...Filter) Filter + Eq(index string, key string) Filter +} + +type FilterBuilderOptions[T any] struct { + Dataset Dataset + FileSystem storage.FS + + Indexes Indexes[T] +} + +func (o FilterBuilderOptions[T]) WithDefaults() FilterBuilderOptions[T] { + o.FileSystem = cmp.Or(o.FileSystem, storage.FS(local.NewLocalFS(""))) + return o +} + +type filterBuilder[T any] struct { + ctx context.Context + + indexes map[IndexName]Index[T] + fs storage.FS +} + +func NewFilterBuilder[T any](opt FilterBuilderOptions[T]) (FilterBuilder, error) { + // apply default options on uninitialized fields + opt = opt.WithDefaults() + + // mount indexes directory + fs := storage.NewPrefixWrapper(opt.FileSystem, fmt.Sprintf("%s/", path.Join(opt.Dataset.FullPath(), IndexesDirectory))) + + return &filterBuilder[T]{ + indexes: opt.Indexes, + fs: fs, + }, nil +} + +type filter struct { + resultSet func(ctx context.Context) *roaring64.Bitmap +} + +func (c *filter) Eval(ctx context.Context) FilterIterator { + if c.resultSet == nil { + c.resultSet = func(ctx context.Context) *roaring64.Bitmap { + return roaring64.New() + } + } + return newFilterIterator(c.resultSet(ctx)) +} + +func (c *filterBuilder[T]) And(filters ...Filter) Filter { + return &filter{ + resultSet: func(ctx context.Context) *roaring64.Bitmap { + var bmap *roaring64.Bitmap + for _, filter := range filters { + if filter == nil { + continue + } + + iter := filter.Eval(ctx) + if bmap == nil { + bmap = iter.Bitmap().Clone() + } else { + bmap.And(iter.Bitmap()) + } + } + return bmap + }, + } +} + +func (c *filterBuilder[T]) Or(filters ...Filter) Filter { + return &filter{ + resultSet: func(ctx context.Context) *roaring64.Bitmap { + var bmap *roaring64.Bitmap + for _, filter := range filters { + if filter == nil { + continue + } + + iter := filter.Eval(ctx) + if bmap == nil { + bmap = iter.Bitmap().Clone() + } else { + bmap.Or(iter.Bitmap()) + } + } + return bmap + }, + } +} + +func (c *filterBuilder[T]) Eq(index string, key string) Filter { + + return &filter{ + resultSet: func(ctx context.Context) *roaring64.Bitmap { + // fetch the index file and include it in the result set + index_ := IndexName(index).Normalize() + idx, ok := c.indexes[index_] + if !ok { + return roaring64.New() + } + + bitmap, err := idx.Fetch(ctx, c.fs, IndexedValue(key)) + if err != nil { + return roaring64.New() + } + return bitmap + }, + } +} + +type filterIterator struct { + iter roaring64.IntPeekable64 + bitmap *roaring64.Bitmap +} + +func newFilterIterator(bmap *roaring64.Bitmap) FilterIterator { + return &filterIterator{ + iter: bmap.Iterator(), + bitmap: bmap, + } +} + +func (f *filterIterator) HasNext() bool { + return f.iter.HasNext() +} + +func (f *filterIterator) Next() (uint64, uint16) { + // TODO: how to handle if there's no next? + val := f.iter.Next() + return IndexCompoundID(val).Split() +} + +func (f *filterIterator) Peek() (uint64, uint16) { + val := f.iter.PeekNext() + return IndexCompoundID(val).Split() +} + +func (f *filterIterator) Bitmap() *roaring64.Bitmap { + return f.bitmap +} diff --git a/filter_test.go b/filter_test.go new file mode 100644 index 0000000..f733895 --- /dev/null +++ b/filter_test.go @@ -0,0 +1,405 @@ +package ethwal + +import ( + "context" + "fmt" + "math" + "os" + "testing" + + "github.com/0xsequence/ethkit/go-ethereum/common" + "github.com/0xsequence/ethwal/storage" + "github.com/stretchr/testify/assert" +) + +var ( + indexTestDir = ".tmp/ethwal_index_test" +) + +func setupMockData[T any](indexGenerator func() Indexes[T], blockGenerator func() []Block[T]) (*Indexer[T], Indexes[T], storage.FS, func(), error) { + indexes := indexGenerator() + indexer, err := NewIndexer(context.Background(), IndexerOptions[T]{ + Dataset: Dataset{Path: indexTestDir}, + Indexes: indexes, + }) + if err != nil { + return nil, nil, nil, nil, err + } + blocks := blockGenerator() + for _, block := range blocks { + err := indexer.Index(context.Background(), block) + if err != nil { + return nil, nil, nil, nil, err + } + } + err = indexer.Flush(context.Background()) + if err != nil { + return nil, nil, nil, nil, err + } + + return indexer, indexes, nil, cleanupIndexMockData(), nil +} + +func cleanupIndexMockData() func() { + return func() { + err := os.RemoveAll(indexTestDir) + if err != nil { + panic(err) + } + } +} + +func generateMixedIntBlocks() []Block[[]int] { + blocks := []Block[[]int]{} + + // 0-19 generate 20 blocks with only even data + // 20-39 generate 20 blocks with only odd data + // 40-44 generate 5 blocks with even + odd data + // 45-49 generate 5 blocks with no data + // 50-69 generate 20 blocks with random but repeating huge numbers + + for i := 1; i <= 20; i++ { + blocks = append(blocks, Block[[]int]{ + Hash: common.BytesToHash([]byte{byte(i)}), + Number: uint64(i), + Data: []int{i * 2}, + }) + } + + for i := 21; i <= 40; i++ { + blocks = append(blocks, Block[[]int]{ + Hash: common.BytesToHash([]byte{byte(i)}), + Number: uint64(i), + Data: []int{i*2 + 1}, + }) + } + + for i := 41; i <= 45; i++ { + blocks = append(blocks, Block[[]int]{ + Hash: common.BytesToHash([]byte{byte(i)}), + Number: uint64(i), + Data: []int{i*2 + 1, i*2 + 2}, + }) + } + + for i := 46; i <= 50; i++ { + blocks = append(blocks, Block[[]int]{ + Hash: common.BytesToHash([]byte{byte(i)}), + Number: uint64(i), + Data: []int{}, + }) + } + + numbers := []int{ + 121, + 123, + 125, + 999, + 777, + 333, + 555, + 111, + } + + for i := 51; i < 71; i++ { + data := []int{} + for j := i; j < i+20; j++ { + data = append(data, numbers[j%len(numbers)]) + } + + blocks = append(blocks, Block[[]int]{ + Hash: common.BytesToHash([]byte{byte(i)}), + Number: uint64(i), + Data: data, + }) + } + + return blocks +} + +func generateIntBlocks() []Block[[]int] { + blocks := []Block[[]int]{} + + for i := 0; i < 100; i++ { + blocks = append(blocks, Block[[]int]{ + Hash: common.BytesToHash([]byte{byte(i)}), + Number: uint64(i), + Data: []int{i, i + 1}, + }) + } + + return blocks +} + +func generateMixedIntIndexes() Indexes[[]int] { + indexes := Indexes[[]int]{} + indexes["all"] = NewIndex[[]int]("all", indexBlock) + indexes["odd_even"] = NewIndex[[]int]("odd_even", indexOddEvenBlocks) + indexes["only_even"] = NewIndex[[]int]("only_even", indexOnlyEvenBlocks) + indexes["only_odd"] = NewIndex[[]int]("only_odd", indexOnlyOddBlocks) + return indexes +} + +func generateIntIndexes() Indexes[[]int] { + indexes := Indexes[[]int]{} + indexes["all"] = NewIndex[[]int]("all", indexAll) + indexes["none"] = NewIndex[[]int]("none", indexNone) + return indexes +} + +func indexOddEvenBlocks(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) { + if len(block.Data) == 0 { + return false, nil, nil + } + + toIndex = true + indexValueMap = make(map[IndexedValue][]uint16) + indexValueMap["even"] = []uint16{} + indexValueMap["odd"] = []uint16{} + for i, data := range block.Data { + if data%2 == 0 { + indexValueMap["even"] = append(indexValueMap["even"], uint16(i)) + } else { + indexValueMap["odd"] = append(indexValueMap["odd"], uint16(i)) + } + } + + return +} + +func indexOnlyEvenBlocks(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) { + if len(block.Data) == 0 { + return false, nil, nil + } + + toIndex = true + indexValueMap = make(map[IndexedValue][]uint16) + for _, data := range block.Data { + if data%2 != 0 { + toIndex = false + break + } + } + + if toIndex { + indexValueMap["true"] = []uint16{math.MaxUint16} + } + + return +} +func indexOnlyOddBlocks(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) { + if len(block.Data) == 0 { + return false, nil, nil + } + + toIndex = true + indexValueMap = make(map[IndexedValue][]uint16) + for _, data := range block.Data { + if data%2 == 0 { + toIndex = false + break + } + } + + if toIndex { + indexValueMap["true"] = []uint16{math.MaxUint16} + } + + return +} + +func indexBlock(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) { + if len(block.Data) == 0 { + return false, nil, nil + } + + if block.Number < 50 { + return false, nil, nil + } + + toIndex = true + indexValueMap = make(map[IndexedValue][]uint16) + for i, data := range block.Data { + dataStr := IndexedValue(fmt.Sprintf("%d", data)) + if _, ok := indexValueMap[dataStr]; !ok { + indexValueMap[dataStr] = []uint16{} + } + indexValueMap[dataStr] = append(indexValueMap[dataStr], uint16(i)) + } + + return +} + +func indexAll(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) { + if len(block.Data) == 0 { + return false, nil, nil + } + + toIndex = true + indexValueMap = make(map[IndexedValue][]uint16) + for _, data := range block.Data { + dataStr := IndexedValue(fmt.Sprintf("%d", data)) + if _, ok := indexValueMap[dataStr]; !ok { + indexValueMap[dataStr] = []uint16{math.MaxUint16} + } + } + + return +} + +func indexNone(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) { + return false, nil, nil +} + +func TestMaxMagicCompoundID(t *testing.T) { + id := NewIndexCompoundID(uint64(math.Exp2(48)-1), math.MaxUint16) + assert.Equal(t, uint64(math.Exp2(48)-1), id.BlockNumber()) + assert.Equal(t, uint16(math.MaxUint16), id.DataIndex()) +} + +func TestIntMixFiltering(t *testing.T) { + _, indexes, _, cleanup, err := setupMockData(generateMixedIntIndexes, generateMixedIntBlocks) + assert.NoError(t, err) + defer cleanup() + + f, err := NewFilterBuilder(FilterBuilderOptions[[]int]{ + Dataset: Dataset{ + Path: indexTestDir, + }, + Indexes: indexes, + }) + assert.NoError(t, err) + assert.NotNil(t, f) + + onlyEvenFilter := f.Eq("only_even", "true") + onlyOddFilter := f.Eq("only_odd", "true") + oddFilter := f.Eq("odd_even", "odd") + numbersIdxs := []string{ + "121", + "123", + "125", + "999", + "777", + "333", + "555", + "111", + } + var numberFilter Filter + for _, number := range numbersIdxs { + if numberFilter == nil { + numberFilter = f.Eq("all", number) + } else { + numberFilter = f.Or(numberFilter, f.Eq("all", number)) + } + } + + onlyEvenResults := onlyEvenFilter.Eval(context.Background()) + assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 20) + for _, id := range onlyEvenResults.Bitmap().ToArray() { + block, _ := IndexCompoundID(id).Split() + assert.True(t, block <= 20) + } + + onlyOddResults := onlyOddFilter.Eval(context.Background()) + assert.Len(t, onlyOddResults.Bitmap().ToArray(), 20+20) + for _, id := range onlyOddResults.Bitmap().ToArray() { + block, _ := IndexCompoundID(id).Split() + assert.True(t, (block > 20 && block < 41) || (block > 50 && block < 71)) + } + + numberAllResults := numberFilter.Eval(context.Background()) + // 20*20 + assert.Len(t, numberAllResults.Bitmap().ToArray(), 400) + for _, id := range numberAllResults.Bitmap().ToArray() { + block, _ := IndexCompoundID(id).Split() + assert.True(t, block > 50 && block < 71) + } + + allNumberAndOdd := f.And(numberFilter, oddFilter) + allNumberOddResults := allNumberAndOdd.Eval(context.Background()) + assert.ElementsMatch(t, numberAllResults.Bitmap().ToArray(), allNumberOddResults.Bitmap().ToArray()) +} + +func TestFiltering(t *testing.T) { + _, indexes, _, cleanup, err := setupMockData(generateIntIndexes, generateIntBlocks) + assert.NoError(t, err) + defer cleanup() + + f, err := NewFilterBuilder(FilterBuilderOptions[[]int]{ + Dataset: Dataset{Path: indexTestDir}, + Indexes: indexes, + }) + assert.NoError(t, err) + assert.NotNil(t, f) + result := f.Or(f.And(f.Eq("all", "1"), f.Eq("all", "2")), f.Eq("all", "3")).Eval(context.Background()) + // result should contain block 1, 2, 3 + assert.Len(t, result.Bitmap().ToArray(), 3) + block, _ := result.Next() + assert.Equal(t, uint64(1), block) + block, _ = result.Next() + assert.Equal(t, uint64(2), block) + block, _ = result.Next() + assert.Equal(t, uint64(3), block) + + result = f.And(f.Eq("all", "1"), f.Eq("all", "2")).Eval(context.Background()) + // result should contain block 1 + assert.Len(t, result.Bitmap().ToArray(), 1) + block, _ = result.Next() + assert.Equal(t, uint64(1), block) +} + +func TestLowestIndexedBlockNum(t *testing.T) { + indexer, indexes, _, cleanup, err := setupMockData(generateIntIndexes, generateIntBlocks) + assert.NoError(t, err) + defer cleanup() + + blockNum := indexer.BlockNum() + assert.Equal(t, uint64(99), blockNum) + + for _, i := range indexes { + i.numBlocksIndexed = nil + block, err := i.LastBlockNumIndexed(context.Background(), indexer.fs) + assert.NoError(t, err) + assert.Equal(t, uint64(99), block) + } + + indexes = generateIntIndexes() + indexer, err = NewIndexer(context.Background(), IndexerOptions[[]int]{ + Dataset: Dataset{Path: indexTestDir}, + Indexes: indexes, + }) + assert.NoError(t, err) + lowestBlockIndexed := indexer.BlockNum() + assert.Equal(t, uint64(99), lowestBlockIndexed) + + // add another filter... + // indexes["odd_even"] = NewIndex("odd_even", indexOddEvenBlocks) + // setup fresh objects + indexes["odd_even"] = NewIndex("odd_even", indexOddEvenBlocks) + indexer, err = NewIndexer(context.Background(), IndexerOptions[[]int]{ + Dataset: Dataset{Path: indexTestDir}, + Indexes: indexes, + }) + assert.NoError(t, err) + lowestBlockIndexed = indexer.BlockNum() + assert.Equal(t, uint64(0), lowestBlockIndexed) + blocks := generateIntBlocks() + for _, block := range blocks[:50] { + err = indexer.Index(context.Background(), block) + assert.NoError(t, err) + } + err = indexer.Flush(context.Background()) + assert.NoError(t, err) + lowestBlockIndexed = indexer.BlockNum() + assert.Equal(t, uint64(49), lowestBlockIndexed) + + // index more blocks + for _, block := range blocks[50:] { + err = indexer.Index(context.Background(), block) + assert.NoError(t, err) + } + err = indexer.Flush(context.Background()) + assert.NoError(t, err) + lowestBlockIndexed = indexer.BlockNum() + assert.Equal(t, uint64(99), lowestBlockIndexed) +} diff --git a/go.mod b/go.mod index 84721fb..854b369 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,17 @@ require ( cloud.google.com/go/storage v1.41.0 github.com/0xsequence/ethkit v1.26.1 github.com/DataDog/zstd v1.5.5 + github.com/RoaringBitmap/roaring/v2 v2.3.4 github.com/Shopify/go-storage v1.3.2 github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 github.com/fatih/structs v1.1.0 github.com/fxamacker/cbor/v2 v2.6.0 + github.com/go-chi/chi/v5 v5.1.0 + github.com/go-chi/cors v1.2.1 github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.27.2 golang.org/x/oauth2 v0.20.0 + golang.org/x/sync v0.7.0 ) require ( @@ -21,8 +25,15 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/iam v1.1.8 // indirect + github.com/bits-and-blooms/bitset v1.12.0 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect + github.com/consensys/bavard v0.1.13 // indirect + github.com/consensys/gnark-crypto v0.12.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect + github.com/ethereum/c-kzg-4844/bindings/go v0.0.0-20230126171313-363c7d7593b4 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -33,11 +44,12 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.4 // indirect github.com/holiman/uint256 v1.2.4 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/mmcloughlin/addchain v0.4.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - github.com/tidwall/btree v1.7.0 // indirect + github.com/supranational/blst v0.3.11-0.20230124161941-ca03e11a3ff2 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect go.opencensus.io v0.24.0 // indirect @@ -48,7 +60,6 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.5.0 // indirect @@ -58,6 +69,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 // indirect google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.34.1 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/go.sum b/go.sum index 3ac3cba..eb955f8 100644 --- a/go.sum +++ b/go.sum @@ -16,29 +16,53 @@ github.com/0xsequence/ethkit v1.26.1/go.mod h1:mot9svDfrz13RcDeK6sZnqHvbGac9kzXR github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/RoaringBitmap/roaring/v2 v2.3.4 h1:gGEh3lC5rrrvp9NoZeIAUJ/QvvvJ2qiNNWn+9gbHsDk= +github.com/RoaringBitmap/roaring/v2 v2.3.4/go.mod h1:qhgqItwt5rQY0Nj4zw9nMhXv4Pkq2D8dA8RPJAfyU08= github.com/Shopify/go-storage v1.3.2 h1:POQkNXLEMLV3ra/YUYxqh0jD+/8R7bbLvJHyvo4jqd0= github.com/Shopify/go-storage v1.3.2/go.mod h1:+TaY1ck1poxnIMR9d20JcfBKuT6dhtaz8yAojOr8ID8= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/btcsuite/btcd v0.23.4 h1:IzV6qqkfwbItOS/sg/aDfPDsjPP8twrCOE2R93hxMlQ= +github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= +github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 h1:6lhrsTEnloDPXyeZBvSYvQf8u86jbKehZPVDDlkgDl4= github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= +github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= +github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= +github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI= +github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= +github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ethereum/c-kzg-4844/bindings/go v0.0.0-20230126171313-363c7d7593b4 h1:B2mpK+MNqgPqk2/KNi1LbqwtZDy5F7iy0mynQiBr8VA= +github.com/ethereum/c-kzg-4844/bindings/go v0.0.0-20230126171313-363c7d7593b4/go.mod h1:y4GA2JbAUama1S4QwYjC2hefgGLU8Ul0GMtL/ADMF1c= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fxamacker/cbor/v2 v2.6.0 h1:sU6J2usfADwWlYDAFhZBQ6TnLFBHxgesMrQfQgk1tWA= github.com/fxamacker/cbor/v2 v2.6.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= +github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4= +github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -72,6 +96,7 @@ github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9 github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= +github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -81,18 +106,22 @@ github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI= github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU= github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= +github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= +github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= +github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= +github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -102,13 +131,14 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= -github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= +github.com/supranational/blst v0.3.11-0.20230124161941-ca03e11a3ff2 h1:wh1wzwAhZBNiZO37uWS/nDaKiIwHz4mDo4pnA+fqTO0= +github.com/supranational/blst v0.3.11-0.20230124161941-ca03e11a3ff2/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI= github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -207,8 +237,12 @@ google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU= +rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA= diff --git a/index.go b/index.go new file mode 100644 index 0000000..6530b2c --- /dev/null +++ b/index.go @@ -0,0 +1,272 @@ +package ethwal + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/binary" + "fmt" + "io" + "math" + "strings" + "sync/atomic" + + "github.com/0xsequence/ethwal/storage" + "github.com/RoaringBitmap/roaring/v2/roaring64" +) + +// IndexAllDataIndexes is a special position that indicates that all data indexes should be indexed. +const IndexAllDataIndexes = math.MaxUint16 + +// IndexFunction is a function that indexes a block. +// +// The function should return true if the block should be indexed, and false otherwise. +// The function should return an error if the indexing fails. +// The function should return a map of index values to positions in the block. +type IndexFunction[T any] func(block Block[T]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) + +// IndexCompoundID is a compound ID for an index. It is a combination of the block number and the index within the block. +type IndexCompoundID uint64 + +func NewIndexCompoundID(blockNum uint64, dataIndex uint16) IndexCompoundID { + return IndexCompoundID(uint64(blockNum<<16 | uint64(dataIndex))) +} + +func (i IndexCompoundID) BlockNumber() uint64 { + return (uint64(i) & 0xFFFFFFFFFFFF0000) >> 16 +} + +func (i IndexCompoundID) DataIndex() uint16 { + return uint16(i) & 0xFFFF +} + +func (i IndexCompoundID) Split() (uint64, uint16) { + return i.BlockNumber(), i.DataIndex() +} + +// IndexName is the name of an index. +type IndexName string + +func (i IndexName) Normalize() IndexName { + return IndexName(strings.ToLower(string(i))) +} + +// IndexedValue is the indexed value of an index. +type IndexedValue string + +// IndexUpdate is a map of indexed values and their corresponding bitmaps. +type IndexUpdate struct { + Data map[IndexedValue]*roaring64.Bitmap + LastBlockNum uint64 +} + +func (u *IndexUpdate) Merge(update *IndexUpdate) { + for indexValue, bm := range update.Data { + if _, ok := u.Data[indexValue]; !ok { + u.Data[indexValue] = roaring64.New() + } + u.Data[indexValue].Or(bm) + } + + if u.LastBlockNum < update.LastBlockNum { + u.LastBlockNum = update.LastBlockNum + } +} + +// Indexes is a map of index names to indexes. +type Indexes[T any] map[IndexName]Index[T] + +// Index is an index struct. +type Index[T any] struct { + name IndexName + indexFunc IndexFunction[T] + + numBlocksIndexed *atomic.Uint64 +} + +func NewIndex[T any](name IndexName, indexFunc IndexFunction[T]) Index[T] { + return Index[T]{ + name: name.Normalize(), + indexFunc: indexFunc, + } +} + +func (i *Index[T]) Name() IndexName { + return i.name +} + +func (i *Index[T]) Fetch(ctx context.Context, fs storage.FS, indexValue IndexedValue) (*roaring64.Bitmap, error) { + file, err := NewIndexFile(fs, i.name, indexValue) + if err != nil { + return nil, fmt.Errorf("failed to open IndexBlock file: %w", err) + } + bmap, err := file.Read(ctx) + if err != nil { + return nil, err + } + + return bmap, nil +} + +func (i *Index[T]) IndexBlock(ctx context.Context, fs storage.FS, block Block[T]) (*IndexUpdate, error) { + numBlocksIndexed, err := i.LastBlockNumIndexed(ctx, fs) + if err != nil { + return nil, fmt.Errorf("unexpected: failed to get number of blocks indexed: %w", err) + } + + if block.Number <= numBlocksIndexed { + return nil, nil + } + + toIndex, indexValueMap, err := i.indexFunc(block) + if err != nil { + return nil, fmt.Errorf("failed to IndexBlock block: %w", err) + } + if !toIndex { + return &IndexUpdate{LastBlockNum: block.Number}, nil + } + + indexValueCompoundMap := make(map[IndexedValue][]IndexCompoundID) + for indexValue, positions := range indexValueMap { + if _, ok := indexValueMap[indexValue]; !ok { + indexValueCompoundMap[indexValue] = make([]IndexCompoundID, 0) + } + for _, pos := range positions { + indexValueCompoundMap[indexValue] = append(indexValueCompoundMap[indexValue], NewIndexCompoundID(block.Number, pos)) + } + } + + indexUpdate := &IndexUpdate{ + Data: make(map[IndexedValue]*roaring64.Bitmap), + LastBlockNum: block.Number, + } + for indexValue, indexIDs := range indexValueCompoundMap { + bm, ok := indexUpdate.Data[indexValue] + if !ok { + bm = roaring64.New() + indexUpdate.Data[indexValue] = bm + } + + for _, indexID := range indexIDs { + bm.Add(uint64(indexID)) + } + } + return indexUpdate, nil +} + +func (i *Index[T]) Store(ctx context.Context, fs storage.FS, indexUpdate *IndexUpdate) error { + lastBlockNumIndexed, err := i.LastBlockNumIndexed(ctx, fs) + if err != nil { + return fmt.Errorf("failed to get number of blocks indexed: %w", err) + } + if lastBlockNumIndexed >= indexUpdate.LastBlockNum { + return nil + } + + for indexValue, bmUpdate := range indexUpdate.Data { + if bmUpdate.IsEmpty() { + continue + } + + file, err := NewIndexFile(fs, i.name, indexValue) + if err != nil { + return fmt.Errorf("failed to open or create IndexBlock file: %w", err) + } + + bmap, err := file.Read(ctx) + if err != nil { + return err + } + + bmap.Or(bmUpdate) + + err = file.Write(ctx, bmap) + if err != nil { + return err + } + } + + err = i.storeLastBlockNumIndexed(ctx, fs, indexUpdate.LastBlockNum) + if err != nil { + return fmt.Errorf("failed to index number of blocks indexed: %w", err) + } + + return nil +} + +func (i *Index[T]) LastBlockNumIndexed(ctx context.Context, fs storage.FS) (uint64, error) { + if i.numBlocksIndexed != nil { + return i.numBlocksIndexed.Load(), nil + } + + file, err := fs.Open(ctx, indexedBlockNumFilePath(string(i.name)), nil) + if err != nil { + // file doesn't exist + return 0, nil + } + defer file.Close() + + buf, err := io.ReadAll(file) + if err != nil { + return 0, fmt.Errorf("failed to read IndexBlock file: %w", err) + } + + var numBlocksIndexed uint64 + err = binary.Read(bytes.NewReader(buf), binary.BigEndian, &numBlocksIndexed) + if err != nil { + return 0, fmt.Errorf("failed to unmarshal bitmap: %w", err) + } + + i.numBlocksIndexed = &atomic.Uint64{} + i.numBlocksIndexed.Store(numBlocksIndexed) + + return numBlocksIndexed, nil +} + +func (i *Index[T]) storeLastBlockNumIndexed(ctx context.Context, fs storage.FS, numBlocksIndexed uint64) error { + var prevBlockIndexed uint64 + blocksIndexed, err := i.LastBlockNumIndexed(ctx, fs) + if err == nil { + prevBlockIndexed = blocksIndexed + } + + if prevBlockIndexed >= numBlocksIndexed { + return nil + } + + file, err := fs.Create(ctx, indexedBlockNumFilePath(string(i.name)), nil) + if err != nil { + return fmt.Errorf("failed to open IndexBlock file: %w", err) + } + + err = binary.Write(file, binary.BigEndian, numBlocksIndexed) + if err != nil { + return fmt.Errorf("failed to write IndexBlock file: %w", err) + } + + err = file.Close() + if err != nil { + return fmt.Errorf("failed to close IndexBlock file: %w", err) + } + + if i.numBlocksIndexed == nil { + i.numBlocksIndexed = &atomic.Uint64{} + } + i.numBlocksIndexed.Store(numBlocksIndexed) + return nil +} + +func indexedBlockNumFilePath(index string) string { + return fmt.Sprintf("%s/%s", index, "indexed") +} + +func indexPath(index string, indexValue string) string { + hash := sha256.Sum224([]byte(indexValue)) + return fmt.Sprintf("%s/%06d/%06d/%06d/%s", + index, + binary.BigEndian.Uint64(hash[0:8])%NumberOfDirectoriesPerLevel, // level0 + binary.BigEndian.Uint64(hash[8:16])%NumberOfDirectoriesPerLevel, // level1 + binary.BigEndian.Uint64(hash[16:24])%NumberOfDirectoriesPerLevel, // level2 + fmt.Sprintf("%s.idx", indexValue), // filename + ) +} diff --git a/index_file.go b/index_file.go new file mode 100644 index 0000000..454ee7e --- /dev/null +++ b/index_file.go @@ -0,0 +1,61 @@ +package ethwal + +import ( + "context" + "fmt" + "io" + + "github.com/0xsequence/ethwal/storage" + "github.com/RoaringBitmap/roaring/v2/roaring64" +) + +type IndexFile struct { + fs storage.FS + path string +} + +func NewIndexFile(fs storage.FS, indexName IndexName, value IndexedValue) (*IndexFile, error) { + path := indexPath(string(indexName), string(value)) + return &IndexFile{fs: fs, path: path}, nil +} + +func (i *IndexFile) Read(ctx context.Context) (*roaring64.Bitmap, error) { + file, err := i.fs.Open(ctx, i.path, nil) + if err != nil { + // TODO: decide if we should report an error or just create a new roaring bitmap... + // with this approach we are not reporting an error if the file does not exist + // and we just write the new bitmap when write is called... + // return nil, fmt.Errorf("failed to open IndexBlock file: %w", err) + return roaring64.New(), nil + } + defer file.Close() + + decomp := NewZSTDDecompressor(file) + defer decomp.Close() + + buf, err := io.ReadAll(decomp) + if err != nil { + return nil, fmt.Errorf("failed to read IndexBlock file: %w", err) + } + bmap := roaring64.New() + err = bmap.UnmarshalBinary(buf) + + if err != nil { + return nil, fmt.Errorf("failed to unmarshal bitmap: %w", err) + } + return bmap, nil +} + +func (i *IndexFile) Write(ctx context.Context, bmap *roaring64.Bitmap) error { + file, err := i.fs.Create(ctx, i.path, nil) + if err != nil { + return fmt.Errorf("failed to open IndexBlock file: %w", err) + } + defer file.Close() + + comp := NewZSTDCompressor(file) + defer comp.Close() + + _, err = bmap.WriteTo(comp) + return err +} diff --git a/indexer.go b/indexer.go new file mode 100644 index 0000000..6deeb1d --- /dev/null +++ b/indexer.go @@ -0,0 +1,152 @@ +package ethwal + +import ( + "cmp" + "context" + "fmt" + "math" + "path" + "sync" + + "github.com/0xsequence/ethwal/storage" + "github.com/0xsequence/ethwal/storage/local" + "github.com/RoaringBitmap/roaring/v2/roaring64" + "github.com/c2h5oh/datasize" + "golang.org/x/sync/errgroup" +) + +const IndexesDirectory = ".indexes" + +type IndexerOptions[T any] struct { + Dataset Dataset + FileSystem storage.FS + + Indexes Indexes[T] +} + +func (o IndexerOptions[T]) WithDefaults() IndexerOptions[T] { + o.FileSystem = cmp.Or(o.FileSystem, storage.FS(local.NewLocalFS(""))) + return o +} + +type Indexer[T any] struct { + indexes map[IndexName]Index[T] + indexUpdates map[IndexName]*IndexUpdate + fs storage.FS + + mu sync.Mutex +} + +func NewIndexer[T any](ctx context.Context, opt IndexerOptions[T]) (*Indexer[T], error) { + // apply default options on uninitialized fields + opt = opt.WithDefaults() + + // mount indexes directory + fs := storage.NewPrefixWrapper(opt.FileSystem, fmt.Sprintf("%s/", path.Join(opt.Dataset.FullPath(), IndexesDirectory))) + + // populate indexUpdates with last block number indexed + indexMaps := make(map[IndexName]*IndexUpdate) + for _, index := range opt.Indexes { + lastBlockNum, err := index.LastBlockNumIndexed(ctx, fs) + if err != nil { + return nil, fmt.Errorf("Indexer.NewIndexer: failed to get last block number indexed for %s: %w", index.Name(), err) + } + + indexMaps[index.name] = &IndexUpdate{Data: make(map[IndexedValue]*roaring64.Bitmap), LastBlockNum: lastBlockNum} + } + + return &Indexer[T]{ + indexes: opt.Indexes, + indexUpdates: indexMaps, + fs: fs, + }, nil +} + +func (i *Indexer[T]) Index(ctx context.Context, block Block[T]) error { + for _, index := range i.indexes { + bmUpdate, err := index.IndexBlock(ctx, i.fs, block) + if err != nil { + return err + } + if bmUpdate == nil { + continue + } + + i.mu.Lock() + updateBatch := i.indexUpdates[index.name] + updateBatch.Merge(bmUpdate) + i.indexUpdates[index.name] = updateBatch + i.mu.Unlock() + } + + return nil +} + +func (i *Indexer[T]) EstimatedBatchSize() datasize.ByteSize { + i.mu.Lock() + defer i.mu.Unlock() + + var size datasize.ByteSize = 0 + for _, indexUpdate := range i.indexUpdates { + for _, bm := range indexUpdate.Data { + size += datasize.ByteSize(bm.GetSizeInBytes()) + } + } + return size +} + +func (i *Indexer[T]) Flush(ctx context.Context) error { + i.mu.Lock() + defer i.mu.Unlock() + + errGrp, gCtx := errgroup.WithContext(ctx) + + for name, indexUpdate := range i.indexUpdates { + idx, ok := i.indexes[name] + if !ok { + continue + } + + errGrp.Go(func() error { + err := idx.Store(gCtx, i.fs, indexUpdate) + if err != nil { + return err + } + return nil + }) + } + + err := errGrp.Wait() + if err != nil { + return fmt.Errorf("Indexer.Flush: failed to flush indexes: %w", err) + } + + // clear indexUpdates + for _, index := range i.indexes { + i.indexUpdates[index.name].Data = make(map[IndexedValue]*roaring64.Bitmap) + } + return nil +} + +// BlockNum returns the lowest block number indexed by all indexes. If no blocks have been indexed, it returns 0. +// This is useful for determining the starting block number for a new Indexer. +func (i *Indexer[T]) BlockNum() uint64 { + i.mu.Lock() + defer i.mu.Unlock() + + var lowestBlockNum uint64 = math.MaxUint64 + for _, indexUpdate := range i.indexUpdates { + if indexUpdate.LastBlockNum < lowestBlockNum { + lowestBlockNum = indexUpdate.LastBlockNum + } + } + + if lowestBlockNum == math.MaxUint64 { + return 0 + } + return lowestBlockNum +} + +func (i *Indexer[T]) Close(ctx context.Context) error { + return i.Flush(ctx) +} diff --git a/reader_with_filter.go b/reader_with_filter.go new file mode 100644 index 0000000..7f43208 --- /dev/null +++ b/reader_with_filter.go @@ -0,0 +1,99 @@ +package ethwal + +import ( + "context" + "io" + "reflect" +) + +type readerWithFilter[T any] struct { + lastBlockNum uint64 + reader Reader[T] + filter Filter + iterator FilterIterator +} + +var _ Reader[any] = (*readerWithFilter[any])(nil) + +func NewReaderWithFilter[T any](reader Reader[T], filter Filter) (Reader[T], error) { + return &readerWithFilter[T]{ + reader: reader, + filter: filter, + }, nil +} + +func (c *readerWithFilter[T]) FilesNum() int { + return c.reader.FilesNum() +} + +func (c *readerWithFilter[T]) Seek(ctx context.Context, blockNum uint64) error { + iter := c.filter.Eval(ctx) + for iter.HasNext() { + nextBlock, _ := iter.Peek() + if nextBlock >= blockNum { + break + } + iter.Next() + } + + c.iterator = iter + return nil +} + +func (c *readerWithFilter[T]) BlockNum() uint64 { + return c.lastBlockNum +} + +func (c *readerWithFilter[T]) Read(ctx context.Context) (Block[T], error) { + // Lazy init iterator + if c.iterator == nil { + c.iterator = c.filter.Eval(ctx) + } + + // Check if there are no more blocks to read + if !c.iterator.HasNext() { + return Block[T]{}, io.EOF + } + + // Collect all data indexes for the block + blockNum, dataIndex := c.iterator.Next() + dataIndexes := []uint16{dataIndex} + + doFilter := dataIndex != IndexAllDataIndexes + for c.iterator.HasNext() { + nextBlockNum, nextDataIndex := c.iterator.Peek() + if blockNum != nextBlockNum { + break + } + + _, _ = c.iterator.Next() + dataIndexes = append(dataIndexes, nextDataIndex) + } + + // Seek to the block + err := c.reader.Seek(ctx, blockNum) + if err != nil { + return Block[T]{}, err + } + + block, err := c.reader.Read(ctx) + if err != nil { + return Block[T]{}, err + } + + // Filter the block data + if dType := reflect.TypeOf(block.Data); doFilter && (dType.Kind() == reflect.Slice || dType.Kind() == reflect.Array) { + newData := reflect.Indirect(reflect.New(dType)) + for _, dataIndex := range dataIndexes { + newData = reflect.Append(newData, reflect.ValueOf(block.Data).Index(int(dataIndex))) + } + block.Data = newData.Interface().(T) + } + + c.lastBlockNum = blockNum + return block, nil +} + +func (c *readerWithFilter[T]) Close() error { + return c.reader.Close() +} diff --git a/reader_with_filter_test.go b/reader_with_filter_test.go new file mode 100644 index 0000000..fe7ae29 --- /dev/null +++ b/reader_with_filter_test.go @@ -0,0 +1,123 @@ +package ethwal + +import ( + "context" + "errors" + "io" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupReaderWithFilterTest(t *testing.T) Indexes[[]int] { + opt := Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewCompressor: NewZSTDCompressor, + NewDecompressor: NewZSTDDecompressor, + NewEncoder: NewCBOREncoder, + NewDecoder: NewCBORDecoder, + FileRollOnClose: true, + } + + w, err := NewWriter[[]int](opt) + require.NoError(t, err) + + blocks := generateMixedIntBlocks() + for _, block := range blocks { + err := w.Write(context.Background(), block) + require.NoError(t, err) + } + + w.Close(context.Background()) + + indexes := generateMixedIntIndexes() + + ib, err := NewIndexer(context.Background(), IndexerOptions[[]int]{ + Dataset: opt.Dataset, + Indexes: indexes, + }) + require.NoError(t, err) + + for _, block := range blocks { + err := ib.Index(context.Background(), block) + require.NoError(t, err) + } + + err = ib.Flush(context.Background()) + require.NoError(t, err) + + return indexes +} + +func teardownReaderWithFilterTest() { + _ = os.RemoveAll(testPath) +} + +func TestReaderWithFilter(t *testing.T) { + indexes := setupReaderWithFilterTest(t) + defer teardownReaderWithFilterTest() + + r, err := NewReader[[]int](Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewDecompressor: NewZSTDDecompressor, + NewDecoder: NewCBORDecoder, + }) + require.NoError(t, err) + + fb, err := NewFilterBuilder(FilterBuilderOptions[[]int]{ + Dataset: Dataset{ + Path: testPath, + }, + Indexes: indexes, + }) + require.NoError(t, err) + + r, err = NewReaderWithFilter[[]int](r, fb.Eq("only_even", "true")) + require.NoError(t, err) + + for { + block, err := r.Read(context.Background()) + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + + for _, i := range block.Data { + assert.Equal(t, 0, i%2) + } + } + + _ = r.Close() + + r, err = NewReader[[]int](Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewDecompressor: NewZSTDDecompressor, + NewDecoder: NewCBORDecoder, + }) + require.NoError(t, err) + + r, err = NewReaderWithFilter[[]int](r, fb.Eq("only_odd", "true")) + require.NoError(t, err) + + for { + block, err := r.Read(context.Background()) + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + + for _, i := range block.Data { + assert.Equal(t, 1, i%2) + } + } + + _ = r.Close() +} diff --git a/storage/storage.go b/storage/storage.go index e6e9fb0..ff7a0f0 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -6,6 +6,8 @@ type FS storage.FS type Walker storage.Walker +type File storage.File + var NewPrefixWrapper = storage.NewPrefixWrapper var NewCacheWrapper = storage.NewCacheWrapper diff --git a/writer.go b/writer.go index 1e9c40a..6507c17 100644 --- a/writer.go +++ b/writer.go @@ -13,10 +13,13 @@ import ( ) type Writer[T any] interface { + FileSystem() storage.FS Write(ctx context.Context, b Block[T]) error BlockNum() uint64 RollFile(ctx context.Context) error Close(ctx context.Context) error + Options() Options + SetOptions(opt Options) } type writer[T any] struct { @@ -92,6 +95,10 @@ func NewWriter[T any](opt Options) (Writer[T], error) { }, nil } +func (w *writer[T]) FileSystem() storage.FS { + return w.fs +} + func (w *writer[T]) Write(ctx context.Context, b Block[T]) error { w.mu.Lock() defer w.mu.Unlock() @@ -155,6 +162,14 @@ func (w *writer[T]) Close(ctx context.Context) error { return nil } +func (w *writer[T]) Options() Options { + return w.options +} + +func (w *writer[T]) SetOptions(opt Options) { + w.options = opt +} + func (w *writer[T]) isReadyToWrite() bool { return w.encoder != nil } @@ -184,6 +199,7 @@ func (w *writer[T]) rollFile(ctx context.Context) error { func (w *writer[T]) writeFile(ctx context.Context) error { // create new file newFile := &File{FirstBlockNum: w.firstBlockNum, LastBlockNum: w.lastBlockNum} + w.options.FileRollPolicy.onFlush(ctx) // add file to file index err := w.fileIndex.AddFile(newFile) diff --git a/writer_file_roll_policy.go b/writer_file_roll_policy.go index e9d98ac..d455d06 100644 --- a/writer_file_roll_policy.go +++ b/writer_file_roll_policy.go @@ -13,6 +13,7 @@ type FileRollPolicy interface { onWrite(data []byte) onBlockProcessed(blockNum uint64) + onFlush(ctx context.Context) } type fileSizeRollPolicy struct { @@ -38,6 +39,8 @@ func (p *fileSizeRollPolicy) onWrite(data []byte) { func (p *fileSizeRollPolicy) onBlockProcessed(blockNum uint64) {} +func (p *fileSizeRollPolicy) onFlush(ctx context.Context) {} + // fileStats is a writer that keeps track of the number of bytes written to it. type writerWrapper struct { io.Writer @@ -74,6 +77,8 @@ func (l *lastBlockNumberRollPolicy) onBlockProcessed(blockNum uint64) { l.lastBlockNum = blockNum } +func (l *lastBlockNumberRollPolicy) onFlush(ctx context.Context) {} + type timeBasedRollPolicy struct { rollInterval time.Duration onError func(err error) @@ -107,6 +112,8 @@ func (t *timeBasedRollPolicy) onWrite(data []byte) {} func (t *timeBasedRollPolicy) onBlockProcessed(blockNum uint64) {} +func (t *timeBasedRollPolicy) onFlush(ctx context.Context) {} + type FileRollPolicies []FileRollPolicy func (policies FileRollPolicies) ShouldRoll() bool { @@ -136,4 +143,40 @@ func (policies FileRollPolicies) onBlockProcessed(blockNum uint64) { } } +func (policies FileRollPolicies) onFlush(ctx context.Context) { + for _, p := range policies { + p.onFlush(ctx) + } +} + +type wrappedRollPolicy struct { + rollPolicy FileRollPolicy + flushFunc func(ctx context.Context) +} + +func NewWrappedRollPolicy(rollPolicy FileRollPolicy, flushFunc func(ctx context.Context)) FileRollPolicy { + return &wrappedRollPolicy{rollPolicy: rollPolicy, flushFunc: flushFunc} +} + +func (w *wrappedRollPolicy) ShouldRoll() bool { + return w.rollPolicy.ShouldRoll() +} + +func (w *wrappedRollPolicy) Reset() { + w.rollPolicy.Reset() +} + +func (w *wrappedRollPolicy) onWrite(data []byte) { + w.rollPolicy.onWrite(data) +} + +func (w *wrappedRollPolicy) onBlockProcessed(blockNum uint64) { + w.rollPolicy.onBlockProcessed(blockNum) +} + +func (w *wrappedRollPolicy) onFlush(ctx context.Context) { + w.rollPolicy.onFlush(ctx) + w.flushFunc(ctx) +} + var _ FileRollPolicy = &fileSizeRollPolicy{} diff --git a/writer_no_gap.go b/writer_no_gap.go index e16a62d..2a1df1b 100644 --- a/writer_no_gap.go +++ b/writer_no_gap.go @@ -1,6 +1,10 @@ package ethwal -import "context" +import ( + "context" + + "github.com/0xsequence/ethwal/storage" +) type noGapWriter[T any] struct { w Writer[T] @@ -12,6 +16,10 @@ func NewWriterNoGap[T any](w Writer[T]) Writer[T] { return &noGapWriter[T]{w: w} } +func (n *noGapWriter[T]) FileSystem() storage.FS { + return n.w.FileSystem() +} + func (n *noGapWriter[T]) Write(ctx context.Context, b Block[T]) error { defer func() { n.lastBlockNum = b.Number }() @@ -46,3 +54,11 @@ func (n *noGapWriter[T]) BlockNum() uint64 { func (n *noGapWriter[T]) Close(ctx context.Context) error { return n.w.Close(ctx) } + +func (n *noGapWriter[T]) Options() Options { + return n.w.Options() +} + +func (n *noGapWriter[T]) SetOptions(opts Options) { + n.w.SetOptions(opts) +} diff --git a/writer_with_indexer.go b/writer_with_indexer.go new file mode 100644 index 0000000..e9a9213 --- /dev/null +++ b/writer_with_indexer.go @@ -0,0 +1,88 @@ +package ethwal + +import ( + "context" + "fmt" + "log" + + "github.com/0xsequence/ethwal/storage" +) + +type writerWithIndexer[T any] struct { + writer Writer[T] + + indexer *Indexer[T] +} + +var _ Writer[any] = (*writerWithIndexer[any])(nil) + +func NewWriterWithIndexer[T any](writer Writer[T], indexer *Indexer[T]) (Writer[T], error) { + if writer.BlockNum() > indexer.BlockNum() { + // todo: implement a way to catch up indexer with writer + // this should never happen if the writer with indexer is used + return nil, fmt.Errorf("writer is ahead of indexer, can't catch up") + } + + opts := writer.Options() + wrappedPolicy := NewWrappedRollPolicy(opts.FileRollPolicy, func(ctx context.Context) { + err := indexer.Flush(ctx) + if err != nil { + log.Default().Println("failed to flush index", "err", err) + } + }) + opts.FileRollPolicy = wrappedPolicy + writer.SetOptions(opts) + + return &writerWithIndexer[T]{indexer: indexer, writer: writer}, nil +} + +func (c *writerWithIndexer[T]) FileSystem() storage.FS { + return c.writer.FileSystem() +} + +func (c *writerWithIndexer[T]) Write(ctx context.Context, block Block[T]) error { + // update indexes first (idempotent) + err := c.index(ctx, block) + if err != nil { + return err + } + + // write block, noop if block already written + err = c.writer.Write(ctx, block) + if err != nil { + return err + } + return nil +} + +func (c *writerWithIndexer[T]) Close(ctx context.Context) error { + err := c.indexer.Close(ctx) + if err != nil { + return err + } + return c.writer.Close(ctx) +} + +func (c *writerWithIndexer[T]) BlockNum() uint64 { + return min(c.writer.BlockNum(), c.indexer.BlockNum()) +} + +func (c *writerWithIndexer[T]) RollFile(ctx context.Context) error { + err := c.indexer.Flush(ctx) + if err != nil { + return err + } + return c.writer.RollFile(ctx) +} + +func (c *writerWithIndexer[T]) Options() Options { + return c.writer.Options() +} + +func (c *writerWithIndexer[T]) SetOptions(options Options) { + c.writer.SetOptions(options) +} + +func (c *writerWithIndexer[T]) index(ctx context.Context, block Block[T]) error { + return c.indexer.Index(ctx, block) +} diff --git a/writer_with_indexer_test.go b/writer_with_indexer_test.go new file mode 100644 index 0000000..3db6a0d --- /dev/null +++ b/writer_with_indexer_test.go @@ -0,0 +1,59 @@ +package ethwal + +import ( + "context" + "os" + "path" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWriterWithIndexer(t *testing.T) { + defer func() { + _ = os.RemoveAll(testPath) + }() + + blocks := generateMixedIntBlocks() + + indexes := generateMixedIntIndexes() + + indexer, err := NewIndexer(context.Background(), IndexerOptions[[]int]{ + Dataset: Dataset{ + Path: testPath, + }, + Indexes: indexes, + }) + require.NoError(t, err) + + w, err := NewWriter[[]int](Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewCompressor: NewZSTDCompressor, + NewEncoder: NewCBOREncoder, + }) + require.NoError(t, err) + + wi, err := NewWriterWithIndexer(w, indexer) + require.NoError(t, err) + + for _, block := range blocks { + err := wi.Write(context.Background(), block) + require.NoError(t, err) + } + + err = wi.RollFile(context.Background()) + require.NoError(t, err) + + err = wi.Close(context.Background()) + require.NoError(t, err) + + indexDirEntries, err := os.ReadDir(path.Join(testPath, ".indexes")) + require.NoError(t, err) + require.Len(t, indexDirEntries, 4) + + ethwalDirEntries, err := os.ReadDir(testPath) + require.NoError(t, err) + require.Len(t, ethwalDirEntries, 3) +}