From d7db68ca4f00ae606ee2b89faa8a44237972cd61 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Thu, 31 Oct 2024 14:18:52 +0100 Subject: [PATCH] refactor: small changes + unit tests --- filter.go | 2 +- filter_index_test.go | 153 ------------------------- filter_setup_test.go => filter_test.go | 147 +++++++++++++++++++++++- index.go | 18 +-- index_builder.go | 99 ---------------- indexer.go | 120 +++++++++++++++++++ reader_with_filter_test.go | 117 +++++++++++++++++++ writer_with_index_builder.go | 84 -------------- writer_with_indexer.go | 88 ++++++++++++++ writer_with_indexer_test.go | 55 +++++++++ 10 files changed, 534 insertions(+), 349 deletions(-) delete mode 100644 filter_index_test.go rename filter_setup_test.go => filter_test.go (55%) delete mode 100644 index_builder.go create mode 100644 indexer.go create mode 100644 reader_with_filter_test.go delete mode 100644 writer_with_index_builder.go create mode 100644 writer_with_indexer.go create mode 100644 writer_with_indexer_test.go diff --git a/filter.go b/filter.go index 4fbe257..aba39b4 100644 --- a/filter.go +++ b/filter.go @@ -89,7 +89,7 @@ func (c *filterBuilder[T]) Or(filters ...Filter) Filter { } func (c *filterBuilder[T]) Eq(index string, key string) Filter { - // fetch the IndexBlock and store it in the result set + // fetch the IndexBlock and index it in the result set index_ := IndexName(index).Normalize() idx, ok := c.indexes[index_] if !ok { diff --git a/filter_index_test.go b/filter_index_test.go deleted file mode 100644 index d0f6909..0000000 --- a/filter_index_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package ethwal - -import ( - "context" - "math" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMaxMagicCompoundID(t *testing.T) { - id := NewIndexCompoundID(uint64(math.Exp2(48)-1), math.MaxUint16) - assert.Equal(t, uint64(math.Exp2(48)-1), id.BlockNumber()) - assert.Equal(t, uint16(math.MaxUint16), id.DataIndex()) -} - -func TestIntMixFiltering(t *testing.T) { - _, indexes, _, cleanup, err := setupMockData("int_mix", generateMixedIntIndexes, generateMixedIntBlocks) - assert.NoError(t, err) - defer cleanup() - - f, err := NewFilterBuilder(indexes) - assert.NoError(t, err) - assert.NotNil(t, f) - - onlyEvenFilter := f.Eq("only_even", "true") - onlyOddFilter := f.Eq("only_odd", "true") - oddFilter := f.Eq("odd_even", "odd") - numbersIdxs := []string{ - "121", - "123", - "125", - "999", - "777", - "333", - "555", - "111", - } - var numberFilter Filter - for _, number := range numbersIdxs { - if numberFilter == nil { - numberFilter = f.Eq("all", number) - } else { - numberFilter = f.Or(numberFilter, f.Eq("all", number)) - } - } - - onlyEvenResults := onlyEvenFilter.Eval() - assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 20) - for _, id := range onlyEvenResults.Bitmap().ToArray() { - block, _ := IndexCompoundID(id).Split() - assert.True(t, block <= 20) - } - - onlyOddResults := onlyOddFilter.Eval() - assert.Len(t, onlyOddResults.Bitmap().ToArray(), 20+20) - for _, id := range onlyOddResults.Bitmap().ToArray() { - block, _ := IndexCompoundID(id).Split() - assert.True(t, (block > 20 && block < 41) || (block > 50 && block < 71)) - } - - numberAllResults := numberFilter.Eval() - // 20*20 - assert.Len(t, numberAllResults.Bitmap().ToArray(), 400) - for _, id := range numberAllResults.Bitmap().ToArray() { - block, _ := IndexCompoundID(id).Split() - assert.True(t, block > 50 && block < 71) - } - - allNumberAndOdd := f.And(numberFilter, oddFilter) - allNumberOddResults := allNumberAndOdd.Eval() - assert.ElementsMatch(t, numberAllResults.Bitmap().ToArray(), allNumberOddResults.Bitmap().ToArray()) -} - -func TestFiltering(t *testing.T) { - _, indexes, _, cleanup, err := setupMockData("int_filtering", generateIntIndexes, generateIntBlocks) - assert.NoError(t, err) - defer cleanup() - - f, err := NewFilterBuilder(indexes) - assert.NoError(t, err) - assert.NotNil(t, f) - result := f.Or(f.And(f.Eq("all", "1"), f.Eq("all", "2")), f.Eq("all", "3")).Eval() - // result should contain block 1, 2, 3 - assert.Len(t, result.Bitmap().ToArray(), 3) - block, _ := result.Next() - assert.Equal(t, uint64(1), block) - block, _ = result.Next() - assert.Equal(t, uint64(2), block) - block, _ = result.Next() - assert.Equal(t, uint64(3), block) - - result = f.And(f.Eq("all", "1"), f.Eq("all", "2")).Eval() - // result should contain block 1 - assert.Len(t, result.Bitmap().ToArray(), 1) - block, _ = result.Next() - assert.Equal(t, uint64(1), block) -} - -func TestLowestIndexedBlockNum(t *testing.T) { - builder, indexes, fs, cleanup, err := setupMockData("int_indexing_num", generateIntIndexes, generateIntBlocks) - assert.NoError(t, err) - defer cleanup() - - blockNum, err := builder.LastIndexedBlockNum(context.Background()) - assert.NoError(t, err) - assert.Equal(t, uint64(99), blockNum) - - for _, i := range indexes { - i.numBlocksIndexed = nil - block, err := i.LastBlockNumIndexed(context.Background()) - assert.NoError(t, err) - assert.Equal(t, uint64(99), block) - } - - indexes = generateIntIndexes(fs) - builder, err = NewIndexBuilder(context.Background(), indexes) - assert.NoError(t, err) - lowestBlockIndexed, err := builder.LastIndexedBlockNum(context.Background()) - assert.NoError(t, err) - assert.Equal(t, uint64(99), lowestBlockIndexed) - - // add another filter... - // indexes["odd_even"] = NewIndex("odd_even", indexOddEvenBlocks) - // setup fresh objects - indexes["odd_even"] = NewIndex("odd_even", indexOddEvenBlocks, fs) - builder, err = NewIndexBuilder(context.Background(), indexes) - assert.NoError(t, err) - lowestBlockIndexed, err = builder.LastIndexedBlockNum(context.Background()) - assert.NoError(t, err) - assert.Equal(t, uint64(0), lowestBlockIndexed) - blocks := generateIntBlocks() - for _, block := range blocks[:50] { - err = builder.Index(context.Background(), block) - assert.NoError(t, err) - } - err = builder.Flush(context.Background()) - assert.NoError(t, err) - lowestBlockIndexed, err = builder.LastIndexedBlockNum(context.Background()) - assert.NoError(t, err) - assert.Equal(t, uint64(49), lowestBlockIndexed) - - // index more blocks - for _, block := range blocks[50:] { - err = builder.Index(context.Background(), block) - assert.NoError(t, err) - } - err = builder.Flush(context.Background()) - assert.NoError(t, err) - lowestBlockIndexed, err = builder.LastIndexedBlockNum(context.Background()) - assert.NoError(t, err) - assert.Equal(t, uint64(99), lowestBlockIndexed) -} diff --git a/filter_setup_test.go b/filter_test.go similarity index 55% rename from filter_setup_test.go rename to filter_test.go index bb8003e..041125c 100644 --- a/filter_setup_test.go +++ b/filter_test.go @@ -3,23 +3,25 @@ package ethwal import ( "context" "fmt" + "math" "os" "path" + "testing" "github.com/0xsequence/ethkit/go-ethereum/common" - "github.com/0xsequence/ethkit/go-ethereum/common/math" "github.com/0xsequence/ethwal/storage" "github.com/0xsequence/ethwal/storage/local" + "github.com/stretchr/testify/assert" ) var ( indexTestDir = ".tmp/ethwal_index_test" ) -func setupMockData[T any](subDir string, indexGenerator func(fs storage.FS) Indexes[T], blockGenerator func() []Block[T]) (*IndexBuilder[T], Indexes[T], storage.FS, func(), error) { +func setupMockData[T any](subDir string, indexGenerator func(fs storage.FS) Indexes[T], blockGenerator func() []Block[T]) (*Indexer[T], Indexes[T], storage.FS, func(), error) { fs := local.NewLocalFS(path.Join(indexTestDir, subDir)) indexes := indexGenerator(fs) - indexBuilder, err := NewIndexBuilder(context.Background(), indexes) + indexBuilder, err := NewIndexer(context.Background(), indexes) if err != nil { return nil, nil, nil, nil, err } @@ -248,3 +250,142 @@ func indexAll(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue] func indexNone(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) { return false, nil, nil } + +func TestMaxMagicCompoundID(t *testing.T) { + id := NewIndexCompoundID(uint64(math.Exp2(48)-1), math.MaxUint16) + assert.Equal(t, uint64(math.Exp2(48)-1), id.BlockNumber()) + assert.Equal(t, uint16(math.MaxUint16), id.DataIndex()) +} + +func TestIntMixFiltering(t *testing.T) { + _, indexes, _, cleanup, err := setupMockData("int_mix", generateMixedIntIndexes, generateMixedIntBlocks) + assert.NoError(t, err) + defer cleanup() + + f, err := NewFilterBuilder(indexes) + assert.NoError(t, err) + assert.NotNil(t, f) + + onlyEvenFilter := f.Eq("only_even", "true") + onlyOddFilter := f.Eq("only_odd", "true") + oddFilter := f.Eq("odd_even", "odd") + numbersIdxs := []string{ + "121", + "123", + "125", + "999", + "777", + "333", + "555", + "111", + } + var numberFilter Filter + for _, number := range numbersIdxs { + if numberFilter == nil { + numberFilter = f.Eq("all", number) + } else { + numberFilter = f.Or(numberFilter, f.Eq("all", number)) + } + } + + onlyEvenResults := onlyEvenFilter.Eval() + assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 20) + for _, id := range onlyEvenResults.Bitmap().ToArray() { + block, _ := IndexCompoundID(id).Split() + assert.True(t, block <= 20) + } + + onlyOddResults := onlyOddFilter.Eval() + assert.Len(t, onlyOddResults.Bitmap().ToArray(), 20+20) + for _, id := range onlyOddResults.Bitmap().ToArray() { + block, _ := IndexCompoundID(id).Split() + assert.True(t, (block > 20 && block < 41) || (block > 50 && block < 71)) + } + + numberAllResults := numberFilter.Eval() + // 20*20 + assert.Len(t, numberAllResults.Bitmap().ToArray(), 400) + for _, id := range numberAllResults.Bitmap().ToArray() { + block, _ := IndexCompoundID(id).Split() + assert.True(t, block > 50 && block < 71) + } + + allNumberAndOdd := f.And(numberFilter, oddFilter) + allNumberOddResults := allNumberAndOdd.Eval() + assert.ElementsMatch(t, numberAllResults.Bitmap().ToArray(), allNumberOddResults.Bitmap().ToArray()) +} + +func TestFiltering(t *testing.T) { + _, indexes, _, cleanup, err := setupMockData("int_filtering", generateIntIndexes, generateIntBlocks) + assert.NoError(t, err) + defer cleanup() + + f, err := NewFilterBuilder(indexes) + assert.NoError(t, err) + assert.NotNil(t, f) + result := f.Or(f.And(f.Eq("all", "1"), f.Eq("all", "2")), f.Eq("all", "3")).Eval() + // result should contain block 1, 2, 3 + assert.Len(t, result.Bitmap().ToArray(), 3) + block, _ := result.Next() + assert.Equal(t, uint64(1), block) + block, _ = result.Next() + assert.Equal(t, uint64(2), block) + block, _ = result.Next() + assert.Equal(t, uint64(3), block) + + result = f.And(f.Eq("all", "1"), f.Eq("all", "2")).Eval() + // result should contain block 1 + assert.Len(t, result.Bitmap().ToArray(), 1) + block, _ = result.Next() + assert.Equal(t, uint64(1), block) +} + +func TestLowestIndexedBlockNum(t *testing.T) { + indexer, indexes, fs, cleanup, err := setupMockData("int_indexing_num", generateIntIndexes, generateIntBlocks) + assert.NoError(t, err) + defer cleanup() + + blockNum := indexer.BlockNum() + assert.Equal(t, uint64(99), blockNum) + + for _, i := range indexes { + i.numBlocksIndexed = nil + block, err := i.LastBlockNumIndexed(context.Background()) + assert.NoError(t, err) + assert.Equal(t, uint64(99), block) + } + + indexes = generateIntIndexes(fs) + indexer, err = NewIndexer(context.Background(), indexes) + assert.NoError(t, err) + lowestBlockIndexed := indexer.BlockNum() + assert.Equal(t, uint64(99), lowestBlockIndexed) + + // add another filter... + // indexes["odd_even"] = NewIndex("odd_even", indexOddEvenBlocks) + // setup fresh objects + indexes["odd_even"] = NewIndex("odd_even", indexOddEvenBlocks, fs) + indexer, err = NewIndexer(context.Background(), indexes) + assert.NoError(t, err) + lowestBlockIndexed = indexer.BlockNum() + assert.Equal(t, uint64(0), lowestBlockIndexed) + blocks := generateIntBlocks() + for _, block := range blocks[:50] { + err = indexer.Index(context.Background(), block) + assert.NoError(t, err) + } + err = indexer.Flush(context.Background()) + assert.NoError(t, err) + lowestBlockIndexed = indexer.BlockNum() + assert.Equal(t, uint64(49), lowestBlockIndexed) + + // index more blocks + for _, block := range blocks[50:] { + err = indexer.Index(context.Background(), block) + assert.NoError(t, err) + } + err = indexer.Flush(context.Background()) + assert.NoError(t, err) + lowestBlockIndexed = indexer.BlockNum() + assert.Equal(t, uint64(99), lowestBlockIndexed) +} diff --git a/index.go b/index.go index 2f48085..29090f9 100644 --- a/index.go +++ b/index.go @@ -51,10 +51,10 @@ func (i IndexName) Normalize() IndexName { return IndexName(strings.ToLower(string(i))) } -// IndexedValue is the value of an index. +// IndexedValue is the indexed value of an index. type IndexedValue string -// IndexUpdate is a map of index values to bitmaps. +// IndexUpdate is a map of indexed values and their corresponding bitmaps. type IndexUpdate struct { Data map[IndexedValue]*roaring64.Bitmap LastBlockNum uint64 @@ -190,7 +190,7 @@ func (i *Index[T]) Store(ctx context.Context, indexUpdate *IndexUpdate) error { err = i.storeLastBlockNumIndexed(ctx, indexUpdate.LastBlockNum) if err != nil { - return fmt.Errorf("failed to store number of blocks indexed: %w", err) + return fmt.Errorf("failed to index number of blocks indexed: %w", err) } return nil @@ -201,7 +201,7 @@ func (i *Index[T]) LastBlockNumIndexed(ctx context.Context) (uint64, error) { return i.numBlocksIndexed.Load(), nil } - file, err := i.fs.Open(ctx, lastBlockNumIndexedPath(string(i.name)), nil) + file, err := i.fs.Open(ctx, indexedBlockNumFilePath(string(i.name)), nil) if err != nil { // file doesn't exist return 0, nil @@ -236,7 +236,7 @@ func (i *Index[T]) storeLastBlockNumIndexed(ctx context.Context, numBlocksIndexe return nil } - file, err := i.fs.Create(ctx, lastBlockNumIndexedPath(string(i.name)), nil) + file, err := i.fs.Create(ctx, indexedBlockNumFilePath(string(i.name)), nil) if err != nil { return fmt.Errorf("failed to open IndexBlock file: %w", err) } @@ -258,6 +258,10 @@ func (i *Index[T]) storeLastBlockNumIndexed(ctx context.Context, numBlocksIndexe return nil } +func indexedBlockNumFilePath(index string) string { + return fmt.Sprintf("%s/%s", index, "indexed") +} + func indexPath(index string, indexValue string) string { hash := sha256.Sum224([]byte(indexValue)) return fmt.Sprintf("%s/%06d/%06d/%06d/%s", @@ -268,7 +272,3 @@ func indexPath(index string, indexValue string) string { fmt.Sprintf("%s.idx", indexValue), // filename ) } - -func lastBlockNumIndexedPath(index string) string { - return fmt.Sprintf("%s/%s", index, "indexed") -} diff --git a/index_builder.go b/index_builder.go deleted file mode 100644 index 4da461a..0000000 --- a/index_builder.go +++ /dev/null @@ -1,99 +0,0 @@ -package ethwal - -import ( - "context" - "fmt" - "sync" - - "github.com/RoaringBitmap/roaring/v2/roaring64" -) - -type IndexBuilder[T any] struct { - mu sync.Mutex - indexes map[IndexName]Index[T] - indexUpdates map[IndexName]*IndexUpdate -} - -func NewIndexBuilder[T any](ctx context.Context, indexes Indexes[T]) (*IndexBuilder[T], error) { - indexMaps := make(map[IndexName]*IndexUpdate) - for _, index := range indexes { - lastBlockNum, err := index.LastBlockNumIndexed(ctx) - if err != nil { - return nil, fmt.Errorf("IndexBuilder.NewIndexBuilder: failed to get last block number indexed for %s: %w", index.Name(), err) - } - - indexMaps[index.name] = &IndexUpdate{Data: make(map[IndexedValue]*roaring64.Bitmap), LastBlockNum: lastBlockNum} - } - return &IndexBuilder[T]{indexes: indexes, indexUpdates: indexMaps}, nil -} - -func (b *IndexBuilder[T]) Index(ctx context.Context, block Block[T]) error { - for _, index := range b.indexes { - bmUpdate, err := index.IndexBlock(ctx, block) - if err != nil { - return err - } - if bmUpdate == nil { - continue - } - - b.mu.Lock() - updateBatch := b.indexUpdates[index.name] - updateBatch.Merge(bmUpdate) - b.indexUpdates[index.name] = updateBatch - b.mu.Unlock() - } - - return nil -} - -func (b *IndexBuilder[T]) Flush(ctx context.Context) error { - b.mu.Lock() - defer b.mu.Unlock() - - for name, indexUpdate := range b.indexUpdates { - idx, ok := b.indexes[name] - if !ok { - continue - } - - err := idx.Store(ctx, indexUpdate) - if err != nil { - return err - } - } - - // clear indexUpdates - for _, index := range b.indexes { - b.indexUpdates[index.name].Data = make(map[IndexedValue]*roaring64.Bitmap) - } - return nil -} - -// LastIndexedBlockNum returns the lowest block number indexed by all indexes. If no blocks have been indexed, it returns 0. -// This is useful for determining the starting block number for a new IndexBuilder. -func (b *IndexBuilder[T]) LastIndexedBlockNum(ctx context.Context) (uint64, error) { - b.mu.Lock() - defer b.mu.Unlock() - - var lowestBlockNum *uint64 - for _, index := range b.indexes { - numBlocksIndexed, err := index.LastBlockNumIndexed(ctx) - if err != nil { - return 0, fmt.Errorf("IndexBuilder.LastIndexedBlockNum: failed to get number of blocks indexed: %w", err) - } - if lowestBlockNum == nil || numBlocksIndexed < *lowestBlockNum { - lowestBlockNum = &numBlocksIndexed - } - } - - if lowestBlockNum == nil { - return 0, nil - } - - return *lowestBlockNum, nil -} - -func (b *IndexBuilder[T]) Close(ctx context.Context) error { - return b.Flush(ctx) -} diff --git a/indexer.go b/indexer.go new file mode 100644 index 0000000..a84f05a --- /dev/null +++ b/indexer.go @@ -0,0 +1,120 @@ +package ethwal + +import ( + "context" + "fmt" + "math" + "sync" + + "github.com/RoaringBitmap/roaring/v2/roaring64" + "github.com/c2h5oh/datasize" + "golang.org/x/sync/errgroup" +) + +type Indexer[T any] struct { + mu sync.Mutex + indexes map[IndexName]Index[T] + indexUpdates map[IndexName]*IndexUpdate +} + +func NewIndexer[T any](ctx context.Context, indexes Indexes[T]) (*Indexer[T], error) { + indexMaps := make(map[IndexName]*IndexUpdate) + for _, index := range indexes { + lastBlockNum, err := index.LastBlockNumIndexed(ctx) + if err != nil { + return nil, fmt.Errorf("Indexer.NewIndexer: failed to get last block number indexed for %s: %w", index.Name(), err) + } + + indexMaps[index.name] = &IndexUpdate{Data: make(map[IndexedValue]*roaring64.Bitmap), LastBlockNum: lastBlockNum} + } + return &Indexer[T]{indexes: indexes, indexUpdates: indexMaps}, nil +} + +func (b *Indexer[T]) Index(ctx context.Context, block Block[T]) error { + for _, index := range b.indexes { + bmUpdate, err := index.IndexBlock(ctx, block) + if err != nil { + return err + } + if bmUpdate == nil { + continue + } + + b.mu.Lock() + updateBatch := b.indexUpdates[index.name] + updateBatch.Merge(bmUpdate) + b.indexUpdates[index.name] = updateBatch + b.mu.Unlock() + } + + return nil +} + +func (b *Indexer[T]) EstimatedBatchSize() datasize.ByteSize { + b.mu.Lock() + defer b.mu.Unlock() + + var size datasize.ByteSize = 0 + for _, indexUpdate := range b.indexUpdates { + for _, bm := range indexUpdate.Data { + size += datasize.ByteSize(bm.GetSizeInBytes()) + } + } + return size +} + +func (b *Indexer[T]) Flush(ctx context.Context) error { + b.mu.Lock() + defer b.mu.Unlock() + + errGrp, gCtx := errgroup.WithContext(ctx) + + for name, indexUpdate := range b.indexUpdates { + idx, ok := b.indexes[name] + if !ok { + continue + } + + errGrp.Go(func() error { + err := idx.Store(gCtx, indexUpdate) + if err != nil { + return err + } + return nil + }) + } + + err := errGrp.Wait() + if err != nil { + return fmt.Errorf("Indexer.Flush: failed to flush indexes: %w", err) + } + + // clear indexUpdates + for _, index := range b.indexes { + b.indexUpdates[index.name].Data = make(map[IndexedValue]*roaring64.Bitmap) + } + return nil +} + +// BlockNum returns the lowest block number indexed by all indexes. If no blocks have been indexed, it returns 0. +// This is useful for determining the starting block number for a new Indexer. +func (b *Indexer[T]) BlockNum() uint64 { + b.mu.Lock() + defer b.mu.Unlock() + + var lowestBlockNum uint64 = math.MaxUint64 + for _, indexUpdate := range b.indexUpdates { + if indexUpdate.LastBlockNum < lowestBlockNum { + lowestBlockNum = indexUpdate.LastBlockNum + } + } + + if lowestBlockNum == math.MaxUint64 { + return 0 + } + return lowestBlockNum +} + +func (b *Indexer[T]) Close(ctx context.Context) error { + return b.Flush(ctx) +} diff --git a/reader_with_filter_test.go b/reader_with_filter_test.go new file mode 100644 index 0000000..94e8ae5 --- /dev/null +++ b/reader_with_filter_test.go @@ -0,0 +1,117 @@ +package ethwal + +import ( + "context" + "errors" + "io" + "os" + "path" + "testing" + + "github.com/0xsequence/ethwal/storage/local" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupReaderWithFilterTest(t *testing.T) Indexes[[]int] { + opt := Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewCompressor: NewZSTDCompressor, + NewDecompressor: NewZSTDDecompressor, + NewEncoder: NewCBOREncoder, + NewDecoder: NewCBORDecoder, + FileRollOnClose: true, + } + + w, err := NewWriter[[]int](opt) + require.NoError(t, err) + + blocks := generateMixedIntBlocks() + for _, block := range blocks { + err := w.Write(context.Background(), block) + require.NoError(t, err) + } + + w.Close(context.Background()) + + indexes := generateMixedIntIndexes(local.NewLocalFS(path.Join(testPath, ".indexes"))) + + ib, err := NewIndexer(context.Background(), indexes) + require.NoError(t, err) + + for _, block := range blocks { + err := ib.Index(context.Background(), block) + require.NoError(t, err) + } + + err = ib.Flush(context.Background()) + require.NoError(t, err) + + return indexes +} + +func teardownReaderWithFilterTest() { + _ = os.RemoveAll(testPath) +} + +func TestReaderWithFilter(t *testing.T) { + indexes := setupReaderWithFilterTest(t) + defer teardownReaderWithFilterTest() + + r, err := NewReader[[]int](Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewDecompressor: NewZSTDDecompressor, + NewDecoder: NewCBORDecoder, + }) + require.NoError(t, err) + + fb, err := NewFilterBuilder(indexes) + require.NoError(t, err) + + r, err = NewReaderWithFilter[[]int](r, fb.Eq("only_even", "true")) + require.NoError(t, err) + + for { + block, err := r.Read(context.Background()) + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + + for _, i := range block.Data { + assert.Equal(t, 0, i%2) + } + } + + _ = r.Close() + + r, err = NewReader[[]int](Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewDecompressor: NewZSTDDecompressor, + NewDecoder: NewCBORDecoder, + }) + require.NoError(t, err) + + r, err = NewReaderWithFilter[[]int](r, fb.Eq("only_odd", "true")) + require.NoError(t, err) + + for { + block, err := r.Read(context.Background()) + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + + for _, i := range block.Data { + assert.Equal(t, 1, i%2) + } + } + + _ = r.Close() +} diff --git a/writer_with_index_builder.go b/writer_with_index_builder.go deleted file mode 100644 index e4b44cd..0000000 --- a/writer_with_index_builder.go +++ /dev/null @@ -1,84 +0,0 @@ -package ethwal - -import ( - "context" - "log" - - "github.com/0xsequence/ethwal/storage" -) - -type writerWithFilter[T any] struct { - writer Writer[T] - indexBuilder *IndexBuilder[T] -} - -var _ Writer[any] = (*writerWithFilter[any])(nil) - -func NewWriterWithIndexBuilder[T any](ctx context.Context, writer Writer[T], indexes Indexes[T]) (Writer[T], error) { - indexBuilder, err := NewIndexBuilder[T](ctx, indexes) - opts := writer.Options() - wrappedPolicy := NewWrappedRollPolicy(opts.FileRollPolicy, func(ctx context.Context) { - err := indexBuilder.Flush(ctx) - if err != nil { - log.Default().Println("failed to flush index", "err", err) - } - }) - opts.FileRollPolicy = wrappedPolicy - writer.SetOptions(opts) - - if err != nil { - return nil, err - } - return &writerWithFilter[T]{indexBuilder: indexBuilder, writer: writer}, nil -} - -func (c *writerWithFilter[T]) FileSystem() storage.FS { - return c.writer.FileSystem() -} - -func (c *writerWithFilter[T]) Write(ctx context.Context, block Block[T]) error { - // update indexes first (idempotent) - err := c.store(ctx, block) - if err != nil { - return err - } - - // write block - err = c.writer.Write(ctx, block) - if err != nil { - return err - } - return nil -} - -func (c *writerWithFilter[T]) Close(ctx context.Context) error { - err := c.indexBuilder.Close(ctx) - if err != nil { - return err - } - return c.writer.Close(ctx) -} - -func (c *writerWithFilter[T]) BlockNum() uint64 { - return c.writer.BlockNum() -} - -func (c *writerWithFilter[T]) RollFile(ctx context.Context) error { - err := c.indexBuilder.Flush(ctx) - if err != nil { - return err - } - return c.writer.RollFile(ctx) -} - -func (c *writerWithFilter[T]) Options() Options { - return c.writer.Options() -} - -func (c *writerWithFilter[T]) SetOptions(options Options) { - c.writer.SetOptions(options) -} - -func (c *writerWithFilter[T]) store(ctx context.Context, block Block[T]) error { - return c.indexBuilder.Index(ctx, block) -} diff --git a/writer_with_indexer.go b/writer_with_indexer.go new file mode 100644 index 0000000..e9a9213 --- /dev/null +++ b/writer_with_indexer.go @@ -0,0 +1,88 @@ +package ethwal + +import ( + "context" + "fmt" + "log" + + "github.com/0xsequence/ethwal/storage" +) + +type writerWithIndexer[T any] struct { + writer Writer[T] + + indexer *Indexer[T] +} + +var _ Writer[any] = (*writerWithIndexer[any])(nil) + +func NewWriterWithIndexer[T any](writer Writer[T], indexer *Indexer[T]) (Writer[T], error) { + if writer.BlockNum() > indexer.BlockNum() { + // todo: implement a way to catch up indexer with writer + // this should never happen if the writer with indexer is used + return nil, fmt.Errorf("writer is ahead of indexer, can't catch up") + } + + opts := writer.Options() + wrappedPolicy := NewWrappedRollPolicy(opts.FileRollPolicy, func(ctx context.Context) { + err := indexer.Flush(ctx) + if err != nil { + log.Default().Println("failed to flush index", "err", err) + } + }) + opts.FileRollPolicy = wrappedPolicy + writer.SetOptions(opts) + + return &writerWithIndexer[T]{indexer: indexer, writer: writer}, nil +} + +func (c *writerWithIndexer[T]) FileSystem() storage.FS { + return c.writer.FileSystem() +} + +func (c *writerWithIndexer[T]) Write(ctx context.Context, block Block[T]) error { + // update indexes first (idempotent) + err := c.index(ctx, block) + if err != nil { + return err + } + + // write block, noop if block already written + err = c.writer.Write(ctx, block) + if err != nil { + return err + } + return nil +} + +func (c *writerWithIndexer[T]) Close(ctx context.Context) error { + err := c.indexer.Close(ctx) + if err != nil { + return err + } + return c.writer.Close(ctx) +} + +func (c *writerWithIndexer[T]) BlockNum() uint64 { + return min(c.writer.BlockNum(), c.indexer.BlockNum()) +} + +func (c *writerWithIndexer[T]) RollFile(ctx context.Context) error { + err := c.indexer.Flush(ctx) + if err != nil { + return err + } + return c.writer.RollFile(ctx) +} + +func (c *writerWithIndexer[T]) Options() Options { + return c.writer.Options() +} + +func (c *writerWithIndexer[T]) SetOptions(options Options) { + c.writer.SetOptions(options) +} + +func (c *writerWithIndexer[T]) index(ctx context.Context, block Block[T]) error { + return c.indexer.Index(ctx, block) +} diff --git a/writer_with_indexer_test.go b/writer_with_indexer_test.go new file mode 100644 index 0000000..54fd9a2 --- /dev/null +++ b/writer_with_indexer_test.go @@ -0,0 +1,55 @@ +package ethwal + +import ( + "context" + "os" + "path" + "testing" + + "github.com/0xsequence/ethwal/storage/local" + "github.com/stretchr/testify/require" +) + +func TestWriterWithIndexer(t *testing.T) { + defer func() { + _ = os.RemoveAll(testPath) + }() + + blocks := generateMixedIntBlocks() + + indexes := generateMixedIntIndexes(local.NewLocalFS(path.Join(testPath, ".indexes"))) + + indexer, err := NewIndexer(context.Background(), indexes) + require.NoError(t, err) + + w, err := NewWriter[[]int](Options{ + Dataset: Dataset{ + Path: testPath, + }, + NewCompressor: NewZSTDCompressor, + NewEncoder: NewCBOREncoder, + }) + require.NoError(t, err) + + wi, err := NewWriterWithIndexer(w, indexer) + require.NoError(t, err) + + for _, block := range blocks { + err := wi.Write(context.Background(), block) + require.NoError(t, err) + } + + err = wi.RollFile(context.Background()) + require.NoError(t, err) + + err = wi.Close(context.Background()) + require.NoError(t, err) + + indexDirEntries, err := os.ReadDir(path.Join(testPath, ".indexes")) + require.NoError(t, err) + require.Len(t, indexDirEntries, 4) + + ethwalDirEntries, err := os.ReadDir(testPath) + require.NoError(t, err) + require.Len(t, ethwalDirEntries, 3) +}