From 26d411a53334454bc03ac5b84cdca2e349d18478 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 10 Apr 2024 14:23:29 +0200 Subject: [PATCH] feat(core): Bridge node short-circuits storing a historical EDS if pruning is enabled (#3283) This PR implements short-circuiting storing EDSes in the case that they are outside the sampling window in the case of a syncing bridge node if `--experimental-pruning` is enabled. Overrides #3261 --- core/eds.go | 25 +++++- core/exchange.go | 28 ++++--- core/exchange_test.go | 142 +++++++++++++++++++++++++++++++--- core/listener.go | 33 ++++---- core/listener_no_race_test.go | 12 +-- core/listener_test.go | 58 ++++++++++---- core/option.go | 22 +++++- nodebuilder/core/module.go | 6 +- nodebuilder/pruner/module.go | 21 ++++- pruner/window.go | 10 +++ 10 files changed, 286 insertions(+), 71 deletions(-) diff --git a/core/eds.go b/core/eds.go index eb93c249ba..c5f9ed6075 100644 --- a/core/eds.go +++ b/core/eds.go @@ -16,8 +16,11 @@ import ( "github.com/celestiaorg/nmt" "github.com/celestiaorg/rsmt2d" + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/share/ipld" ) // extendBlock extends the given block data, returning the resulting @@ -51,14 +54,32 @@ func extendShares(s [][]byte, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare } // storeEDS will only store extended block if it is not empty and doesn't already exist. -func storeEDS(ctx context.Context, hash share.DataHash, eds *rsmt2d.ExtendedDataSquare, store *eds.Store) error { +func storeEDS( + ctx context.Context, + eh *header.ExtendedHeader, + eds *rsmt2d.ExtendedDataSquare, + adder *ipld.ProofsAdder, + store *eds.Store, + window pruner.AvailabilityWindow, +) error { if eds == nil { return nil } - err := store.Put(ctx, hash, eds) + + if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) { + log.Debugw("skipping storage of historic block", "height", eh.Height()) + return nil + } + + ctx = ipld.CtxWithProofsAdder(ctx, adder) + + err := store.Put(ctx, share.DataHash(eh.DataHash), eds) if errors.Is(err, dagstore.ErrShardExists) { // block with given root already exists, return nil return nil } + if err == nil { + log.Debugw("stored EDS for height", "height", eh.Height()) + } return err } diff --git a/core/exchange.go b/core/exchange.go index cf889a38bb..61dd701bb4 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -12,6 +12,7 @@ import ( "github.com/celestiaorg/nmt" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/ipld" ) @@ -23,6 +24,8 @@ type Exchange struct { store *eds.Store construct header.ConstructFn + availabilityWindow pruner.AvailabilityWindow + metrics *exchangeMetrics } @@ -32,9 +35,9 @@ func NewExchange( construct header.ConstructFn, opts ...Option, ) (*Exchange, error) { - p := new(params) + p := defaultParams() for _, opt := range opts { - opt(p) + opt(&p) } var ( @@ -49,10 +52,11 @@ func NewExchange( } return &Exchange{ - fetcher: fetcher, - store: store, - construct: construct, - metrics: metrics, + fetcher: fetcher, + store: store, + construct: construct, + availabilityWindow: p.availabilityWindow, + metrics: metrics, }, nil } @@ -150,11 +154,11 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende &block.Height, hash, eh.Hash()) } - ctx = ipld.CtxWithProofsAdder(ctx, adder) - err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store) + err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow) if err != nil { - return nil, fmt.Errorf("storing EDS to eds.Store for height %d: %w", &block.Height, err) + return nil, err } + return eh, nil } @@ -190,10 +194,10 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)) } - ctx = ipld.CtxWithProofsAdder(ctx, adder) - err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store) + err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow) if err != nil { - return nil, fmt.Errorf("storing EDS to eds.Store for block height %d: %w", b.Header.Height, err) + return nil, err } + return eh, nil } diff --git a/core/exchange_test.go b/core/exchange_test.go index 95c7f83385..f397ed6a5b 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -1,10 +1,12 @@ package core import ( + "bytes" "context" "testing" "time" + "github.com/cosmos/cosmos-sdk/client/flags" ds "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" @@ -13,6 +15,8 @@ import ( "github.com/celestiaorg/celestia-app/test/util/testnode" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/pruner" + "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" ) @@ -21,11 +25,10 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { t.Cleanup(cancel) cfg := DefaultTestConfig() - cfg.ChainID = networkID - fetcher, _ := createCoreFetcher(t, cfg) + cfg.ChainID = testChainID + fetcher, cctx := createCoreFetcher(t, cfg) - // generate 10 blocks - generateBlocks(t, fetcher) + generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) store := createStore(t) @@ -39,18 +42,67 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) - to := uint64(10) + to := uint64(30) expectedFirstHeightInRange := genHeader.Height() + 1 expectedLastHeightInRange := to - 1 expectedLenHeaders := to - expectedFirstHeightInRange - // request headers from height 1 to 10 [2:10) + // request headers from height 1 to 20 [2:30) headers, err := ce.GetRangeByHeight(context.Background(), genHeader, to) require.NoError(t, err) assert.Len(t, headers, int(expectedLenHeaders)) assert.Equal(t, expectedFirstHeightInRange, headers[0].Height()) assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height()) + + for _, h := range headers { + has, err := store.Has(ctx, h.DAH.Hash()) + require.NoError(t, err) + assert.True(t, has) + } +} + +// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not +// store EDSs that are historic if pruning is enabled. +func TestExchange_DoNotStoreHistoric(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + cfg := DefaultTestConfig() + cfg.ChainID = testChainID + fetcher, cctx := createCoreFetcher(t, cfg) + + generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) + + store := createStore(t) + + ce, err := NewExchange( + fetcher, + store, + header.MakeExtendedHeader, + WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic" + ) + require.NoError(t, err) + + // initialize store with genesis block + genHeight := int64(1) + genBlock, err := fetcher.GetBlock(ctx, &genHeight) + require.NoError(t, err) + genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) + require.NoError(t, err) + + headers, err := ce.GetRangeByHeight(ctx, genHeader, 30) + require.NoError(t, err) + + // ensure none of the "historic" EDSs were stored + for _, h := range headers { + if bytes.Equal(h.DataHash, share.EmptyRoot().Hash()) { + continue + } + has, err := store.Has(ctx, h.DAH.Hash()) + require.NoError(t, err) + assert.False(t, has) + } } func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) { @@ -68,14 +120,82 @@ func createStore(t *testing.T) *eds.Store { storeCfg := eds.DefaultParameters() store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore())) require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = store.Start(ctx) + require.NoError(t, err) + + // store an empty square to initialize EDS store + eds := share.EmptyExtendedDataSquare() + err = store.Put(ctx, share.EmptyRoot().Hash(), eds) + require.NoError(t, err) + + t.Cleanup(func() { + err = store.Stop(ctx) + require.NoError(t, err) + }) + return store } -func generateBlocks(t *testing.T, fetcher *BlockFetcher) { - sub, err := fetcher.SubscribeNewBlockEvent(context.Background()) - require.NoError(t, err) +// fillBlocks fills blocks until the context is canceled. +func fillBlocks( + t *testing.T, + ctx context.Context, + cfg *testnode.Config, + cctx testnode.Context, +) { + for { + select { + case <-ctx.Done(): + return + default: + } + + _, err := cctx.FillBlock(16, cfg.Accounts, flags.BroadcastBlock) + require.NoError(t, err) + } +} - for i := 0; i < 10; i++ { - <-sub +// generateNonEmptyBlocks generates at least 20 non-empty blocks +func generateNonEmptyBlocks( + t *testing.T, + ctx context.Context, + fetcher *BlockFetcher, + cfg *testnode.Config, + cctx testnode.Context, +) []share.DataHash { + // generate several non-empty blocks + generateCtx, generateCtxCancel := context.WithCancel(context.Background()) + + sub, err := fetcher.SubscribeNewBlockEvent(ctx) + require.NoError(t, err) + defer func() { + err = fetcher.UnsubscribeNewBlockEvent(ctx) + require.NoError(t, err) + }() + + go fillBlocks(t, generateCtx, cfg, cctx) + + hashes := make([]share.DataHash, 0, 20) + + i := 0 + for i < 20 { + select { + case b, ok := <-sub: + require.True(t, ok) + + if !bytes.Equal(b.Data.Hash(), share.EmptyRoot().Hash()) { + hashes = append(hashes, share.DataHash(b.Data.Hash())) + i++ + } + case <-ctx.Done(): + t.Fatal("failed to fill blocks within timeout") + } } + generateCtxCancel() + + return hashes } diff --git a/core/listener.go b/core/listener.go index 367aa34181..08e36d941f 100644 --- a/core/listener.go +++ b/core/listener.go @@ -15,6 +15,7 @@ import ( "github.com/celestiaorg/nmt" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/ipld" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" @@ -37,8 +38,9 @@ var ( type Listener struct { fetcher *BlockFetcher - construct header.ConstructFn - store *eds.Store + construct header.ConstructFn + store *eds.Store + availabilityWindow pruner.AvailabilityWindow headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader] hashBroadcaster shrexsub.BroadcastFn @@ -60,9 +62,9 @@ func NewListener( blocktime time.Duration, opts ...Option, ) (*Listener, error) { - p := new(params) + p := defaultParams() for _, opt := range opts { - opt(p) + opt(&p) } var ( @@ -77,21 +79,22 @@ func NewListener( } return &Listener{ - fetcher: fetcher, - headerBroadcaster: bcast, - hashBroadcaster: hashBroadcaster, - construct: construct, - store: store, - listenerTimeout: 5 * blocktime, - metrics: metrics, - chainID: p.chainID, + fetcher: fetcher, + headerBroadcaster: bcast, + hashBroadcaster: hashBroadcaster, + construct: construct, + store: store, + availabilityWindow: p.availabilityWindow, + listenerTimeout: 5 * blocktime, + metrics: metrics, + chainID: p.chainID, }, nil } // Start kicks off the Listener listener loop. func (cl *Listener) Start(context.Context) error { if cl.cancel != nil { - return errors.New("listener: already started") + return fmt.Errorf("listener: already started") } ctx, cancel := context.WithCancel(context.Background()) @@ -221,9 +224,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS panic(fmt.Errorf("making extended header: %w", err)) } - // attempt to store block data if not empty - ctx = ipld.CtxWithProofsAdder(ctx, adder) - err = storeEDS(ctx, b.Header.DataHash.Bytes(), eds, cl.store) + err = storeEDS(ctx, eh, eds, adder, cl.store, cl.availabilityWindow) if err != nil { return fmt.Errorf("storing EDS: %w", err) } diff --git a/core/listener_no_race_test.go b/core/listener_no_race_test.go index eac12785ee..3c261a7261 100644 --- a/core/listener_no_race_test.go +++ b/core/listener_no_race_test.go @@ -25,21 +25,15 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) { // create one block to store as Head in local store and then unsubscribe from block events cfg := DefaultTestConfig() - cfg.ChainID = networkID + cfg.ChainID = testChainID fetcher, cctx := createCoreFetcher(t, cfg) eds := createEdsPubSub(ctx, t) store := createStore(t) - err := store.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { - err = store.Stop(ctx) - require.NoError(t, err) - }) // create Listener and start listening - cl := createListener(ctx, t, fetcher, ps0, eds, store, networkID) - err = cl.Start(ctx) + cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID) + err := cl.Start(ctx) require.NoError(t, err) // listen for eds hashes broadcasted through eds-sub and ensure store has diff --git a/core/listener_test.go b/core/listener_test.go index b3ed11e571..0c6bc7f256 100644 --- a/core/listener_test.go +++ b/core/listener_test.go @@ -15,11 +15,12 @@ import ( "github.com/celestiaorg/celestia-node/header" nodep2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) -const networkID = "private" +const testChainID = "private" // TestListener tests the lifecycle of the core listener. func TestListener(t *testing.T) { @@ -31,7 +32,7 @@ func TestListener(t *testing.T) { subscriber, err := p2p.NewSubscriber[*header.ExtendedHeader]( ps1, header.MsgID, - p2p.WithSubscriberNetworkID(networkID), + p2p.WithSubscriberNetworkID(testChainID), ) require.NoError(t, err) err = subscriber.SetVerifier(func(context.Context, *header.ExtendedHeader) error { @@ -45,13 +46,13 @@ func TestListener(t *testing.T) { // create one block to store as Head in local store and then unsubscribe from block events cfg := DefaultTestConfig() - cfg.ChainID = networkID + cfg.ChainID = testChainID fetcher, _ := createCoreFetcher(t, cfg) eds := createEdsPubSub(ctx, t) // create Listener and start listening - cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), networkID) + cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), testChainID) err = cl.Start(ctx) require.NoError(t, err) @@ -79,17 +80,11 @@ func TestListenerWithWrongChainRPC(t *testing.T) { // create one block to store as Head in local store and then unsubscribe from block events cfg := DefaultTestConfig() - cfg.ChainID = networkID + cfg.ChainID = testChainID fetcher, _ := createCoreFetcher(t, cfg) eds := createEdsPubSub(ctx, t) store := createStore(t) - err := store.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { - err = store.Stop(ctx) - require.NoError(t, err) - }) // create Listener and start listening cl := createListener(ctx, t, fetcher, ps0, eds, store, "wrong-chain-rpc") @@ -100,6 +95,42 @@ func TestListenerWithWrongChainRPC(t *testing.T) { assert.ErrorIs(t, err, errInvalidSubscription) } +// TestListener_DoesNotStoreHistoric tests the (unlikely) case that +// blocks come through the listener's subscription that are actually +// older than the sampling window. +func TestListener_DoesNotStoreHistoric(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + // create mocknet with two pubsub endpoints + ps0, _ := createMocknetWithTwoPubsubEndpoints(ctx, t) + + // create one block to store as Head in local store and then unsubscribe from block events + cfg := DefaultTestConfig() + cfg.ChainID = testChainID + fetcher, cctx := createCoreFetcher(t, cfg) + eds := createEdsPubSub(ctx, t) + + store := createStore(t) + + // create Listener and start listening + opt := WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)) + cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID, opt) + + dataRoots := generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) + + err := cl.Start(ctx) + require.NoError(t, err) + + // ensure none of the EDSes were stored + for _, hash := range dataRoots { + has, err := store.Has(ctx, hash) + require.NoError(t, err) + assert.False(t, has) + } + +} + func createMocknetWithTwoPubsubEndpoints(ctx context.Context, t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) { net, err := mocknet.FullMeshLinked(2) require.NoError(t, err) @@ -143,8 +174,9 @@ func createListener( edsSub *shrexsub.PubSub, store *eds.Store, chainID string, + opts ...Option, ) *Listener { - p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID)) + p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(testChainID)) require.NoError(t, err) err = p2pSub.Start(ctx) @@ -158,7 +190,7 @@ func createListener( }) listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader, - store, nodep2p.BlockTime, WithChainID(nodep2p.Network(chainID))) + store, nodep2p.BlockTime, append(opts, WithChainID(nodep2p.Network(chainID)))...) require.NoError(t, err) return listener } diff --git a/core/option.go b/core/option.go index 6916ced4d8..e209d75aca 100644 --- a/core/option.go +++ b/core/option.go @@ -1,13 +1,23 @@ package core -import "github.com/celestiaorg/celestia-node/nodebuilder/p2p" +import ( + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/pruner" + "github.com/celestiaorg/celestia-node/pruner/archival" +) type Option func(*params) type params struct { - metrics bool + metrics bool + chainID string + availabilityWindow pruner.AvailabilityWindow +} - chainID string +func defaultParams() params { + return params{ + availabilityWindow: archival.Window, + } } // WithMetrics is a functional option that enables metrics @@ -23,3 +33,9 @@ func WithChainID(id p2p.Network) Option { p.chainID = id.String() } } + +func WithAvailabilityWindow(window pruner.AvailabilityWindow) Option { + return func(p *params) { + p.availabilityWindow = window + } +} diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index fcf682cdf5..0b0c409406 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -40,8 +40,8 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fetcher *core.BlockFetcher, store *eds.Store, construct header.ConstructFn, + opts []core.Option, ) (*core.Exchange, error) { - var opts []core.Option if MetricsEnabled { opts = append(opts, core.WithMetrics()) } @@ -57,8 +57,10 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option construct header.ConstructFn, store *eds.Store, chainID p2p.Network, + opts []core.Option, ) (*core.Listener, error) { - opts := []core.Option{core.WithChainID(chainID)} + opts = append(opts, core.WithChainID(chainID)) + if MetricsEnabled { opts = append(opts, core.WithMetrics()) } diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index 248798c3a4..76d0ebb430 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -5,6 +5,7 @@ import ( "go.uber.org/fx" + "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/pruner/archival" @@ -19,9 +20,20 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { case node.Light: // light nodes are still subject to sampling within window // even if pruning is not enabled. - return fx.Supply(light.Window) - case node.Full, node.Bridge: - return fx.Supply(archival.Window) + return fx.Options( + fx.Supply(light.Window), + ) + case node.Full: + return fx.Options( + fx.Supply(archival.Window), + ) + case node.Bridge: + return fx.Options( + fx.Supply(archival.Window), + fx.Provide(func() []core.Option { + return []core.Option{} + }), + ) default: panic("unknown node type") } @@ -58,6 +70,9 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { return full.NewPruner(store) }), fx.Supply(full.Window), + fx.Provide(func(window pruner.AvailabilityWindow) []core.Option { + return []core.Option{core.WithAvailabilityWindow(window)} + }), ) // TODO: Eventually, light nodes will be capable of pruning samples // in which case, this can be enabled. diff --git a/pruner/window.go b/pruner/window.go index 0a86c535ce..a79732ade2 100644 --- a/pruner/window.go +++ b/pruner/window.go @@ -5,3 +5,13 @@ import ( ) type AvailabilityWindow time.Duration + +// IsWithinAvailabilityWindow checks whether the given timestamp is within the +// given AvailabilityWindow. If the window is disabled (0), it returns true for +// every timestamp. +func IsWithinAvailabilityWindow(t time.Time, window AvailabilityWindow) bool { + if window == AvailabilityWindow(time.Duration(0)) { + return true + } + return time.Since(t) <= time.Duration(window) +}