Skip to content

Commit

Permalink
Merge branch 'shwap' into shwap-availability
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Jul 31, 2024
2 parents 2e05c59 + 9118b8e commit d6bfea3
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 9 deletions.
51 changes: 51 additions & 0 deletions share/ipld/corrupted_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ipld_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/full"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/getters"
)

// sharesAvailableTimeout is an arbitrarily picked interval of time in which a TestNode is expected
// to be able to complete a SharesAvailable request from a connected peer in a TestDagNet.
const sharesAvailableTimeout = 2 * time.Second

// TestNamespaceHasher_CorruptedData is an integration test that verifies that the NamespaceHasher
// of a recipient of corrupted data will not panic, and will throw away the corrupted data.
func TestNamespaceHasher_CorruptedData(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
net := availability_test.NewTestDAGNet(ctx, t)

requester := full.Node(net)
provider, mockBS := availability_test.MockNode(t, net)
provider.Availability = full.TestAvailability(t, getters.NewIPLDGetter(provider.BlockService))
net.ConnectAll()

// before the provider starts attacking, we should be able to retrieve successfully. We pass a size
// 16 block, but this is not important to the test and any valid block size behaves the same.
root := availability_test.RandFillBS(t, 16, provider.BlockService)

eh := headertest.RandExtendedHeaderWithRoot(t, root)
getCtx, cancelGet := context.WithTimeout(ctx, sharesAvailableTimeout)
t.Cleanup(cancelGet)
err := requester.SharesAvailable(getCtx, eh)
require.NoError(t, err)

// clear the storage of the requester so that it must retrieve again, then start attacking
// we reinitialize the node to clear the eds store
requester = full.Node(net)
mockBS.Attacking = true
getCtx, cancelGet = context.WithTimeout(ctx, sharesAvailableTimeout)
t.Cleanup(cancelGet)
err = requester.SharesAvailable(getCtx, eh)
require.ErrorIs(t, err, share.ErrNotAvailable)
}
83 changes: 83 additions & 0 deletions share/mocks/getter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 16 additions & 8 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
eds "github.com/celestiaorg/celestia-node/share/new_eds"
"github.com/celestiaorg/celestia-node/store/cache"
Expand Down Expand Up @@ -101,24 +102,31 @@ func (s *Store) Put(
height uint64,
square *rsmt2d.ExtendedDataSquare,
) error {
datahash := share.DataHash(roots.Hash())
// we don't need to store empty EDS, just link the height to the empty file
if datahash.IsEmptyEDS() {
lock := s.stripLock.byHeight(height)
lock.Lock()
err := s.ensureHeightLink(roots.Hash(), height)
lock.Unlock()
return err
}

// put to cache before writing to make it accessible while write is happening
accessor := &eds.Rsmt2D{ExtendedDataSquare: square}
_, err := s.cache.First().GetOrLoad(ctx, height, accessorLoader(accessor))
acc, err := s.cache.First().GetOrLoad(ctx, height, accessorLoader(accessor))
if err != nil {
log.Warnf("failed to put Accessor in the recent cache: %s", err)
} else {
// release the ref link to the accessor
utils.CloseAndLog(log, "recent accessor", acc)
}

tNow := time.Now()
datahash := share.DataHash(roots.Hash())
lock := s.stripLock.byDatahashAndHeight(datahash, height)
lock := s.stripLock.byHashAndHeight(datahash, height)
lock.lock()
defer lock.unlock()

if datahash.IsEmptyEDS() {
err := s.ensureHeightLink(roots.Hash(), height)
return err
}

exists, err := s.createFile(square, roots, height)
if exists {
s.metrics.observePutExist(ctx)
Expand Down
2 changes: 1 addition & 1 deletion store/striplock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (l *striplock) byHash(datahash share.DataHash) *sync.RWMutex {
return l.datahashes[lkIdx]
}

func (l *striplock) byDatahashAndHeight(datahash share.DataHash, height uint64) *multiLock {
func (l *striplock) byHashAndHeight(datahash share.DataHash, height uint64) *multiLock {
return &multiLock{[]*sync.RWMutex{l.byHash(datahash), l.byHeight(height)}}
}

Expand Down

0 comments on commit d6bfea3

Please sign in to comment.