Skip to content

Commit

Permalink
feat(core): Bridge node short-circuits storing a historical EDS if pr…
Browse files Browse the repository at this point in the history
…uning is enabled (celestiaorg#3283)

This PR implements short-circuiting storing EDSes in the case that they
are outside the sampling window in the case of a syncing bridge node if
`--experimental-pruning` is enabled.

Overrides celestiaorg#3261
  • Loading branch information
renaynay authored Apr 10, 2024
1 parent 6242fc1 commit 26d411a
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 71 deletions.
25 changes: 23 additions & 2 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
)

// extendBlock extends the given block data, returning the resulting
Expand Down Expand Up @@ -51,14 +54,32 @@ func extendShares(s [][]byte, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare
}

// storeEDS will only store extended block if it is not empty and doesn't already exist.
func storeEDS(ctx context.Context, hash share.DataHash, eds *rsmt2d.ExtendedDataSquare, store *eds.Store) error {
func storeEDS(
ctx context.Context,
eh *header.ExtendedHeader,
eds *rsmt2d.ExtendedDataSquare,
adder *ipld.ProofsAdder,
store *eds.Store,
window pruner.AvailabilityWindow,
) error {
if eds == nil {
return nil
}
err := store.Put(ctx, hash, eds)

if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)

err := store.Put(ctx, share.DataHash(eh.DataHash), eds)
if errors.Is(err, dagstore.ErrShardExists) {
// block with given root already exists, return nil
return nil
}
if err == nil {
log.Debugw("stored EDS for height", "height", eh.Height())
}
return err
}
28 changes: 16 additions & 12 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
)
Expand All @@ -23,6 +24,8 @@ type Exchange struct {
store *eds.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow

metrics *exchangeMetrics
}

Expand All @@ -32,9 +35,9 @@ func NewExchange(
construct header.ConstructFn,
opts ...Option,
) (*Exchange, error) {
p := new(params)
p := defaultParams()
for _, opt := range opts {
opt(p)
opt(&p)
}

var (
Expand All @@ -49,10 +52,11 @@ func NewExchange(
}

return &Exchange{
fetcher: fetcher,
store: store,
construct: construct,
metrics: metrics,
fetcher: fetcher,
store: store,
construct: construct,
availabilityWindow: p.availabilityWindow,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -150,11 +154,11 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for height %d: %w", &block.Height, err)
return nil, err
}

return eh, nil
}

Expand Down Expand Up @@ -190,10 +194,10 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for block height %d: %w", b.Header.Height, err)
return nil, err
}

return eh, nil
}
142 changes: 131 additions & 11 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package core

import (
"bytes"
"context"
"testing"
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
Expand All @@ -13,6 +15,8 @@ import (
"github.com/celestiaorg/celestia-app/test/util/testnode"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
)

Expand All @@ -21,11 +25,10 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
t.Cleanup(cancel)

cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher)
generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)

Expand All @@ -39,18 +42,67 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

to := uint64(10)
to := uint64(30)
expectedFirstHeightInRange := genHeader.Height() + 1
expectedLastHeightInRange := to - 1
expectedLenHeaders := to - expectedFirstHeightInRange

// request headers from height 1 to 10 [2:10)
// request headers from height 1 to 20 [2:30)
headers, err := ce.GetRangeByHeight(context.Background(), genHeader, to)
require.NoError(t, err)

assert.Len(t, headers, int(expectedLenHeaders))
assert.Equal(t, expectedFirstHeightInRange, headers[0].Height())
assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height())

for _, h := range headers {
has, err := store.Has(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.True(t, has)
}
}

// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not
// store EDSs that are historic if pruning is enabled.
func TestExchange_DoNotStoreHistoric(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

cfg := DefaultTestConfig()
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)

ce, err := NewExchange(
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic"
)
require.NoError(t, err)

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

headers, err := ce.GetRangeByHeight(ctx, genHeader, 30)
require.NoError(t, err)

// ensure none of the "historic" EDSs were stored
for _, h := range headers {
if bytes.Equal(h.DataHash, share.EmptyRoot().Hash()) {
continue
}
has, err := store.Has(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.False(t, has)
}
}

func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) {
Expand All @@ -68,14 +120,82 @@ func createStore(t *testing.T) *eds.Store {
storeCfg := eds.DefaultParameters()
store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = store.Start(ctx)
require.NoError(t, err)

// store an empty square to initialize EDS store
eds := share.EmptyExtendedDataSquare()
err = store.Put(ctx, share.EmptyRoot().Hash(), eds)
require.NoError(t, err)

t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

return store
}

func generateBlocks(t *testing.T, fetcher *BlockFetcher) {
sub, err := fetcher.SubscribeNewBlockEvent(context.Background())
require.NoError(t, err)
// fillBlocks fills blocks until the context is canceled.
func fillBlocks(
t *testing.T,
ctx context.Context,
cfg *testnode.Config,
cctx testnode.Context,
) {
for {
select {
case <-ctx.Done():
return
default:
}

_, err := cctx.FillBlock(16, cfg.Accounts, flags.BroadcastBlock)
require.NoError(t, err)
}
}

for i := 0; i < 10; i++ {
<-sub
// generateNonEmptyBlocks generates at least 20 non-empty blocks
func generateNonEmptyBlocks(
t *testing.T,
ctx context.Context,
fetcher *BlockFetcher,
cfg *testnode.Config,
cctx testnode.Context,
) []share.DataHash {
// generate several non-empty blocks
generateCtx, generateCtxCancel := context.WithCancel(context.Background())

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
defer func() {
err = fetcher.UnsubscribeNewBlockEvent(ctx)
require.NoError(t, err)
}()

go fillBlocks(t, generateCtx, cfg, cctx)

hashes := make([]share.DataHash, 0, 20)

i := 0
for i < 20 {
select {
case b, ok := <-sub:
require.True(t, ok)

if !bytes.Equal(b.Data.Hash(), share.EmptyRoot().Hash()) {
hashes = append(hashes, share.DataHash(b.Data.Hash()))
i++
}
case <-ctx.Done():
t.Fatal("failed to fill blocks within timeout")
}
}
generateCtxCancel()

return hashes
}
33 changes: 17 additions & 16 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
Expand All @@ -37,8 +38,9 @@ var (
type Listener struct {
fetcher *BlockFetcher

construct header.ConstructFn
store *eds.Store
construct header.ConstructFn
store *eds.Store
availabilityWindow pruner.AvailabilityWindow

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand All @@ -60,9 +62,9 @@ func NewListener(
blocktime time.Duration,
opts ...Option,
) (*Listener, error) {
p := new(params)
p := defaultParams()
for _, opt := range opts {
opt(p)
opt(&p)
}

var (
Expand All @@ -77,21 +79,22 @@ func NewListener(
}

return &Listener{
fetcher: fetcher,
headerBroadcaster: bcast,
hashBroadcaster: hashBroadcaster,
construct: construct,
store: store,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
fetcher: fetcher,
headerBroadcaster: bcast,
hashBroadcaster: hashBroadcaster,
construct: construct,
store: store,
availabilityWindow: p.availabilityWindow,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
}, nil
}

// Start kicks off the Listener listener loop.
func (cl *Listener) Start(context.Context) error {
if cl.cancel != nil {
return errors.New("listener: already started")
return fmt.Errorf("listener: already started")
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -221,9 +224,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
panic(fmt.Errorf("making extended header: %w", err))
}

// attempt to store block data if not empty
ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, b.Header.DataHash.Bytes(), eds, cl.store)
err = storeEDS(ctx, eh, eds, adder, cl.store, cl.availabilityWindow)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
Expand Down
Loading

0 comments on commit 26d411a

Please sign in to comment.