Skip to content

Commit

Permalink
Merge branch 'shwap' into shwap-availability
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Jul 31, 2024
2 parents 2e05c59 + 9118b8e commit 579ca32
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 9 deletions.
100 changes: 100 additions & 0 deletions share/getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package share

import (
"context"
"errors"
"fmt"

"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
)

var (
// ErrNotFound is used to indicate that requested data could not be found.
ErrNotFound = errors.New("share: data not found")
// ErrOutOfBounds is used to indicate that a passed row or column index is out of bounds of the
// square size.
ErrOutOfBounds = errors.New("share: row or column index is larger than square size")
)

// Getter interface provides a set of accessors for shares by the Root.
// Automatically verifies integrity of shares(exceptions possible depending on the implementation).
//
//go:generate mockgen -destination=mocks/getter.go -package=mocks . Getter
type Getter interface {
// GetShare gets a Share by coordinates in EDS.
GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (Share, error)

// GetEDS gets the full EDS identified by the given extended header.
GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error)

// GetSharesByNamespace gets all shares from an EDS within the given namespace.
// Shares are returned in a row-by-row order if the namespace spans multiple rows.
// Inclusion of returned data could be verified using Verify method on NamespacedShares.
// If no shares are found for target namespace non-inclusion could be also verified by calling
// Verify method.
GetSharesByNamespace(context.Context, *header.ExtendedHeader, Namespace) (NamespacedShares, error)
}

// NamespacedShares represents all shares with proofs within a specific namespace of an EDS.
type NamespacedShares []NamespacedRow

// Flatten returns the concatenated slice of all NamespacedRow shares.
func (ns NamespacedShares) Flatten() []Share {
var shares []Share
for _, row := range ns {
shares = append(shares, row.Shares...)
}
return shares
}

// NamespacedRow represents all shares with proofs within a specific namespace of a single EDS row.
type NamespacedRow struct {
Shares []Share `json:"shares"`
Proof *nmt.Proof `json:"proof"`
}

// Verify validates NamespacedShares by checking every row with nmt inclusion proof.
func (ns NamespacedShares) Verify(root *AxisRoots, namespace Namespace) error {
var originalRoots [][]byte
for _, row := range root.RowRoots {
if !namespace.IsOutsideRange(row, row) {
originalRoots = append(originalRoots, row)
}
}

if len(originalRoots) != len(ns) {
return fmt.Errorf("amount of rows differs between root and namespace shares: expected %d, got %d",
len(originalRoots), len(ns))
}

for i, row := range ns {
if row.Proof == nil && row.Shares == nil {
return fmt.Errorf("row verification failed: no proofs and shares")
}
// verify row data against row hash from original root
if !row.Verify(originalRoots[i], namespace) {
return fmt.Errorf("row verification failed: row %d doesn't match original root: %s", i, root.String())
}
}
return nil
}

// Verify validates the row using nmt inclusion proof.
func (row *NamespacedRow) Verify(rowRoot []byte, namespace Namespace) bool {
// construct nmt leaves from shares by prepending namespace
leaves := make([][]byte, 0, len(row.Shares))
for _, shr := range row.Shares {
leaves = append(leaves, append(GetNamespace(shr), shr...))
}

// verify namespace
return row.Proof.VerifyNamespace(
NewSHA256Hasher(),
namespace.ToNMT(),
leaves,
rowRoot,
)
}
51 changes: 51 additions & 0 deletions share/ipld/corrupted_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ipld_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/full"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/getters"
)

// sharesAvailableTimeout is an arbitrarily picked interval of time in which a TestNode is expected
// to be able to complete a SharesAvailable request from a connected peer in a TestDagNet.
const sharesAvailableTimeout = 2 * time.Second

// TestNamespaceHasher_CorruptedData is an integration test that verifies that the NamespaceHasher
// of a recipient of corrupted data will not panic, and will throw away the corrupted data.
func TestNamespaceHasher_CorruptedData(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
net := availability_test.NewTestDAGNet(ctx, t)

requester := full.Node(net)
provider, mockBS := availability_test.MockNode(t, net)
provider.Availability = full.TestAvailability(t, getters.NewIPLDGetter(provider.BlockService))
net.ConnectAll()

// before the provider starts attacking, we should be able to retrieve successfully. We pass a size
// 16 block, but this is not important to the test and any valid block size behaves the same.
root := availability_test.RandFillBS(t, 16, provider.BlockService)

eh := headertest.RandExtendedHeaderWithRoot(t, root)
getCtx, cancelGet := context.WithTimeout(ctx, sharesAvailableTimeout)
t.Cleanup(cancelGet)
err := requester.SharesAvailable(getCtx, eh)
require.NoError(t, err)

// clear the storage of the requester so that it must retrieve again, then start attacking
// we reinitialize the node to clear the eds store
requester = full.Node(net)
mockBS.Attacking = true
getCtx, cancelGet = context.WithTimeout(ctx, sharesAvailableTimeout)
t.Cleanup(cancelGet)
err = requester.SharesAvailable(getCtx, eh)
require.ErrorIs(t, err, share.ErrNotAvailable)
}
83 changes: 83 additions & 0 deletions share/mocks/getter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 16 additions & 8 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
eds "github.com/celestiaorg/celestia-node/share/new_eds"
"github.com/celestiaorg/celestia-node/store/cache"
Expand Down Expand Up @@ -101,24 +102,31 @@ func (s *Store) Put(
height uint64,
square *rsmt2d.ExtendedDataSquare,
) error {
datahash := share.DataHash(roots.Hash())
// we don't need to store empty EDS, just link the height to the empty file
if datahash.IsEmptyEDS() {
lock := s.stripLock.byHeight(height)
lock.Lock()
err := s.ensureHeightLink(roots.Hash(), height)
lock.Unlock()
return err
}

// put to cache before writing to make it accessible while write is happening
accessor := &eds.Rsmt2D{ExtendedDataSquare: square}
_, err := s.cache.First().GetOrLoad(ctx, height, accessorLoader(accessor))
acc, err := s.cache.First().GetOrLoad(ctx, height, accessorLoader(accessor))
if err != nil {
log.Warnf("failed to put Accessor in the recent cache: %s", err)
} else {
// release the ref link to the accessor
utils.CloseAndLog(log, "recent accessor", acc)
}

tNow := time.Now()
datahash := share.DataHash(roots.Hash())
lock := s.stripLock.byDatahashAndHeight(datahash, height)
lock := s.stripLock.byHashAndHeight(datahash, height)
lock.lock()
defer lock.unlock()

if datahash.IsEmptyEDS() {
err := s.ensureHeightLink(roots.Hash(), height)
return err
}

exists, err := s.createFile(square, roots, height)
if exists {
s.metrics.observePutExist(ctx)
Expand Down
2 changes: 1 addition & 1 deletion store/striplock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (l *striplock) byHash(datahash share.DataHash) *sync.RWMutex {
return l.datahashes[lkIdx]
}

func (l *striplock) byDatahashAndHeight(datahash share.DataHash, height uint64) *multiLock {
func (l *striplock) byHashAndHeight(datahash share.DataHash, height uint64) *multiLock {
return &multiLock{[]*sync.RWMutex{l.byHash(datahash), l.byHeight(height)}}
}

Expand Down

0 comments on commit 579ca32

Please sign in to comment.