From ec47deffec69e812e3a0e72bf7e97aa1c0fa85c1 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Thu, 25 Jul 2024 20:43:11 +0200 Subject: [PATCH] add shwap core integration --- core/eds.go | 20 ++---------- core/exchange.go | 24 +++++---------- core/exchange_test.go | 58 ++++++++++++----------------------- core/listener.go | 17 ++++------ core/listener_no_race_test.go | 12 ++++++-- core/listener_test.go | 20 +++++++----- 6 files changed, 56 insertions(+), 95 deletions(-) diff --git a/core/eds.go b/core/eds.go index 058d54175d..d847deccd7 100644 --- a/core/eds.go +++ b/core/eds.go @@ -2,10 +2,8 @@ package core import ( "context" - "errors" "fmt" - "github.com/filecoin-project/dagstore" "github.com/tendermint/tendermint/types" "github.com/celestiaorg/celestia-app/app" @@ -19,8 +17,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/ipld" + "github.com/celestiaorg/celestia-node/store" ) // extendBlock extends the given block data, returning the resulting @@ -58,26 +55,15 @@ func storeEDS( ctx context.Context, eh *header.ExtendedHeader, eds *rsmt2d.ExtendedDataSquare, - adder *ipld.ProofsAdder, - store *eds.Store, + store *store.Store, window pruner.AvailabilityWindow, ) error { - if eds.Equals(share.EmptyEDS()) { - return nil - } - if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) { log.Debugw("skipping storage of historic block", "height", eh.Height()) return nil } - ctx = ipld.CtxWithProofsAdder(ctx, adder) - - err := store.Put(ctx, share.DataHash(eh.DataHash), eds) - if errors.Is(err, dagstore.ErrShardExists) { - // block with given root already exists, return nil - return nil - } + err := store.Put(ctx, eh.DAH, eh.Height(), eds) if err == nil { log.Debugw("stored EDS for height", "height", eh.Height()) } diff --git a/core/exchange.go b/core/exchange.go index 6a6cba94f0..390e47e1c7 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -9,19 +9,17 @@ import ( "golang.org/x/sync/errgroup" libhead "github.com/celestiaorg/go-header" - "github.com/celestiaorg/nmt" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/ipld" + "github.com/celestiaorg/celestia-node/store" ) const concurrencyLimit = 4 type Exchange struct { fetcher *BlockFetcher - store *eds.Store + store *store.Store construct header.ConstructFn availabilityWindow pruner.AvailabilityWindow @@ -31,7 +29,7 @@ type Exchange struct { func NewExchange( fetcher *BlockFetcher, - store *eds.Store, + store *store.Store, construct header.ConstructFn, opts ...Option, ) (*Exchange, error) { @@ -135,11 +133,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err) } - // extend block data - adder := ipld.NewProofsAdder(int(block.Data.SquareSize), false) - defer adder.Purge() - - eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn())) + eds, err := extendBlock(block.Data, block.Header.Version.App) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err) } @@ -154,7 +148,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende &block.Height, hash, eh.Hash()) } - err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow) + err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow) if err != nil { return nil, err } @@ -180,11 +174,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 } log.Debugw("fetched signed block from core", "height", b.Header.Height) - // extend block data - adder := ipld.NewProofsAdder(int(b.Data.SquareSize), false) - defer adder.Purge() - - eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn())) + eds, err := extendBlock(b.Data, b.Header.Version.App) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err) } @@ -194,7 +184,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)) } - err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow) + err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow) if err != nil { return nil, err } diff --git a/core/exchange_test.go b/core/exchange_test.go index db0b230e27..f20a20d45f 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -1,14 +1,11 @@ package core import ( - "bytes" "context" "testing" "time" "github.com/cosmos/cosmos-sdk/client/flags" - ds "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,7 +14,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/store" ) func TestCoreExchange_RequestHeaders(t *testing.T) { @@ -30,7 +27,8 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) - store := createStore(t) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) ce, err := NewExchange(fetcher, store, header.MakeExtendedHeader) require.NoError(t, err) @@ -56,7 +54,11 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height()) for _, h := range headers { - has, err := store.Has(ctx, h.DAH.Hash()) + has, err := store.HasByHash(ctx, h.DAH.Hash()) + require.NoError(t, err) + assert.True(t, has) + + has, err = store.HasByHeight(ctx, h.Height()) require.NoError(t, err) assert.True(t, has) } @@ -74,7 +76,8 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) - store := createStore(t) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) ce, err := NewExchange( fetcher, @@ -96,10 +99,15 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { // ensure none of the "historic" EDSs were stored for _, h := range headers { - if bytes.Equal(h.DataHash, share.EmptyEDSRoots().Hash()) { + has, err := store.HasByHeight(ctx, h.Height()) + require.NoError(t, err) + assert.False(t, has) + + // empty EDSs are expected to exist in the store, so we skip them + if h.DAH.Equals(share.EmptyEDSRoots()) { continue } - has, err := store.Has(ctx, h.DAH.Hash()) + has, err = store.HasByHash(ctx, h.DAH.Hash()) require.NoError(t, err) assert.False(t, has) } @@ -114,32 +122,6 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn return NewBlockFetcher(cctx.Client), cctx } -func createStore(t *testing.T) *eds.Store { - t.Helper() - - storeCfg := eds.DefaultParameters() - store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore())) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err = store.Start(ctx) - require.NoError(t, err) - - // store an empty square to initialize EDS store - eds := share.EmptyEDS() - err = store.Put(ctx, share.EmptyEDSRoots().Hash(), eds) - require.NoError(t, err) - - t.Cleanup(func() { - err = store.Stop(ctx) - require.NoError(t, err) - }) - - return store -} - // fillBlocks fills blocks until the context is canceled. func fillBlocks( t *testing.T, @@ -187,10 +169,8 @@ func generateNonEmptyBlocks( case b, ok := <-sub: require.True(t, ok) - if !bytes.Equal(b.Data.Hash(), share.EmptyEDSRoots().Hash()) { - hashes = append(hashes, share.DataHash(b.Data.Hash())) - i++ - } + hashes = append(hashes, share.DataHash(b.Data.Hash())) + i++ case <-ctx.Done(): t.Fatal("failed to fill blocks within timeout") } diff --git a/core/listener.go b/core/listener.go index a9b87b6dd2..ece30188fa 100644 --- a/core/listener.go +++ b/core/listener.go @@ -12,13 +12,11 @@ import ( "go.opentelemetry.io/otel/attribute" libhead "github.com/celestiaorg/go-header" - "github.com/celestiaorg/nmt" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" + "github.com/celestiaorg/celestia-node/store" ) var ( @@ -39,7 +37,7 @@ type Listener struct { fetcher *BlockFetcher construct header.ConstructFn - store *eds.Store + store *store.Store availabilityWindow pruner.AvailabilityWindow headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader] @@ -58,7 +56,7 @@ func NewListener( fetcher *BlockFetcher, hashBroadcaster shrexsub.BroadcastFn, construct header.ConstructFn, - store *eds.Store, + store *store.Store, blocktime time.Duration, opts ...Option, ) (*Listener, error) { @@ -214,11 +212,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS span.SetAttributes( attribute.Int64("height", b.Header.Height), ) - // extend block data - adder := ipld.NewProofsAdder(int(b.Data.SquareSize), false) - defer adder.Purge() - eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn())) + eds, err := extendBlock(b.Data, b.Header.Version.App) if err != nil { return fmt.Errorf("extending block data: %w", err) } @@ -229,7 +224,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS panic(fmt.Errorf("making extended header: %w", err)) } - err = storeEDS(ctx, eh, eds, adder, cl.store, cl.availabilityWindow) + err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow) if err != nil { return fmt.Errorf("storing EDS: %w", err) } diff --git a/core/listener_no_race_test.go b/core/listener_no_race_test.go index 3e5d63ab52..a30dc5d3e5 100644 --- a/core/listener_no_race_test.go +++ b/core/listener_no_race_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/store" ) // TestListenerWithNonEmptyBlocks ensures that non-empty blocks are actually @@ -29,11 +30,12 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) { fetcher, cctx := createCoreFetcher(t, cfg) eds := createEdsPubSub(ctx, t) - store := createStore(t) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) // create Listener and start listening cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID) - err := cl.Start(ctx) + err = cl.Start(ctx) require.NoError(t, err) // listen for eds hashes broadcasted through eds-sub and ensure store has @@ -54,7 +56,11 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) { continue } - has, err := store.Has(ctx, msg.DataHash) + has, err := store.HasByHash(ctx, msg.DataHash) + require.NoError(t, err) + require.True(t, has) + + has, err = store.HasByHeight(ctx, msg.Height) require.NoError(t, err) require.True(t, has) } diff --git a/core/listener_test.go b/core/listener_test.go index 9850efec5b..d792fa57a1 100644 --- a/core/listener_test.go +++ b/core/listener_test.go @@ -16,8 +16,8 @@ import ( "github.com/celestiaorg/celestia-node/header" nodep2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" + "github.com/celestiaorg/celestia-node/store" ) const testChainID = "private" @@ -52,7 +52,9 @@ func TestListener(t *testing.T) { eds := createEdsPubSub(ctx, t) // create Listener and start listening - cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), testChainID) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) + cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID) err = cl.Start(ctx) require.NoError(t, err) @@ -84,7 +86,8 @@ func TestListenerWithWrongChainRPC(t *testing.T) { fetcher, _ := createCoreFetcher(t, cfg) eds := createEdsPubSub(ctx, t) - store := createStore(t) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) // create Listener and start listening cl := createListener(ctx, t, fetcher, ps0, eds, store, "wrong-chain-rpc") @@ -111,7 +114,8 @@ func TestListener_DoesNotStoreHistoric(t *testing.T) { fetcher, cctx := createCoreFetcher(t, cfg) eds := createEdsPubSub(ctx, t) - store := createStore(t) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) // create Listener and start listening opt := WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)) @@ -119,12 +123,12 @@ func TestListener_DoesNotStoreHistoric(t *testing.T) { dataRoots := generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) - err := cl.Start(ctx) + err = cl.Start(ctx) require.NoError(t, err) // ensure none of the EDSes were stored for _, hash := range dataRoots { - has, err := store.Has(ctx, hash) + has, err := store.HasByHash(ctx, hash) require.NoError(t, err) assert.False(t, has) } @@ -171,7 +175,7 @@ func createListener( fetcher *BlockFetcher, ps *pubsub.PubSub, edsSub *shrexsub.PubSub, - store *eds.Store, + store *store.Store, chainID string, opts ...Option, ) *Listener {