-
Notifications
You must be signed in to change notification settings - Fork 925
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
52 changed files
with
7,624 additions
and
49 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package utils | ||
|
||
import ( | ||
"io" | ||
|
||
logging "github.com/ipfs/go-log/v2" | ||
) | ||
|
||
func CloseAndLog(log logging.StandardLogger, name string, closer io.Closer) { | ||
if err := closer.Close(); err != nil { | ||
log.Warnf("closing %s: %s", name, err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package shwap | ||
|
||
import ( | ||
"encoding/binary" | ||
"fmt" | ||
|
||
"github.com/celestiaorg/celestia-node/share" | ||
) | ||
|
||
// NamespaceDataIDSize defines the total size of a RowNamespaceDataID in bytes, combining the | ||
// size of a RowID and the size of a Namespace. | ||
const NamespaceDataIDSize = EdsIDSize + 4 + share.NamespaceSize | ||
|
||
// RowNamespaceDataID uniquely identifies a piece of namespaced data within a row of an Extended | ||
// Data Square (EDS). | ||
type NamespaceDataID struct { | ||
// Embedding EdsID to include the block height in RowID. | ||
EdsID | ||
// FromRow and ToRow specify the range of rows within the data square. | ||
FromRowIndex, ToRowIndex int | ||
// DataNamespace is a string representation of the namespace to facilitate comparisons. | ||
DataNamespace share.Namespace | ||
} | ||
|
||
// NewNamespaceDataID creates a new RowNamespaceDataID with the specified parameters. It | ||
// validates the RowNamespaceDataID against the provided Root before returning. | ||
func NewNamespaceDataID( | ||
height uint64, | ||
fromRowIndex, toRowIndex int, | ||
namespace share.Namespace, | ||
edsSize int, | ||
) (NamespaceDataID, error) { | ||
ndid := NamespaceDataID{ | ||
EdsID: EdsID{ | ||
Height: height, | ||
}, | ||
FromRowIndex: fromRowIndex, | ||
ToRowIndex: toRowIndex, | ||
DataNamespace: namespace, | ||
} | ||
|
||
if err := ndid.Verify(edsSize); err != nil { | ||
return NamespaceDataID{}, err | ||
} | ||
return ndid, nil | ||
} | ||
|
||
// NamespaceDataIDFromBinary deserializes a RowNamespaceDataID from its binary form. It returns | ||
// an error if the binary data's length does not match the expected size. | ||
func NamespaceDataIDFromBinary(data []byte) (NamespaceDataID, error) { | ||
if len(data) != NamespaceDataIDSize { | ||
return NamespaceDataID{}, | ||
fmt.Errorf("invalid RowNamespaceDataID length: expected %d, got %d", RowNamespaceDataIDSize, len(data)) | ||
} | ||
|
||
edsID, err := EdsIDFromBinary(data[:EdsIDSize]) | ||
if err != nil { | ||
return NamespaceDataID{}, fmt.Errorf("error unmarshaling RowID: %w", err) | ||
} | ||
|
||
fromRowIndex := int(binary.BigEndian.Uint16(data[EdsIDSize:])) | ||
toRowIndex := int(binary.BigEndian.Uint16(data[EdsIDSize+2:])) | ||
ns := share.Namespace(data[EdsIDSize+4:]) | ||
if err := ns.ValidateForData(); err != nil { | ||
return NamespaceDataID{}, fmt.Errorf("error validating DataNamespace: %w", err) | ||
} | ||
|
||
return NamespaceDataID{ | ||
EdsID: edsID, | ||
FromRowIndex: fromRowIndex, | ||
ToRowIndex: toRowIndex, | ||
DataNamespace: ns, | ||
}, nil | ||
} | ||
|
||
// MarshalBinary encodes RowNamespaceDataID into binary form. | ||
// NOTE: Proto is avoided because | ||
// * Its size is not deterministic which is required for IPLD. | ||
// * No support for uint16 | ||
func (ndid NamespaceDataID) MarshalBinary() ([]byte, error) { | ||
data := make([]byte, 0, NamespaceDataIDSize) | ||
return ndid.appendTo(data), nil | ||
} | ||
|
||
// Verify checks the validity of RowNamespaceDataID's fields, including the RowID and the | ||
// namespace. | ||
func (ndid NamespaceDataID) Verify(edsSize int) error { | ||
if ndid.FromRowIndex >= edsSize { | ||
return fmt.Errorf("FromRowIndex: %w: %d >= %d", ErrOutOfBounds, ndid.FromRowIndex, edsSize) | ||
} | ||
if ndid.ToRowIndex >= edsSize { | ||
return fmt.Errorf("ToRowIndex: %w: %d >= %d", ErrOutOfBounds, ndid.ToRowIndex, edsSize) | ||
} | ||
return ndid.Validate() | ||
} | ||
|
||
func (ndid NamespaceDataID) Validate() error { | ||
if err := ndid.EdsID.Validate(); err != nil { | ||
return fmt.Errorf("error validating RowID: %w", err) | ||
} | ||
if ndid.FromRowIndex > ndid.ToRowIndex { | ||
return fmt.Errorf("%w: FromRowIndex %d is greater than ToRowIndex %d", | ||
ErrInvalidShwapID, ndid.FromRowIndex, ndid.ToRowIndex) | ||
} | ||
if ndid.FromRowIndex < 0 { | ||
return fmt.Errorf("%w: FromRowIndex %d", ErrInvalidShwapID, ndid.FromRowIndex) | ||
} | ||
if ndid.ToRowIndex < 0 { | ||
return fmt.Errorf("%w: ToRowIndex %d", ErrInvalidShwapID, ndid.ToRowIndex) | ||
} | ||
if err := ndid.DataNamespace.ValidateForData(); err != nil { | ||
return fmt.Errorf("%w: error validating DataNamespace: %w", ErrInvalidShwapID, err) | ||
} | ||
return nil | ||
} | ||
|
||
// appendTo helps in appending the binary form of DataNamespace to the serialized RowID data. | ||
func (ndid NamespaceDataID) appendTo(data []byte) []byte { | ||
data = ndid.EdsID.appendTo(data) | ||
data = binary.BigEndian.AppendUint16(data, uint16(ndid.FromRowIndex)) | ||
data = binary.BigEndian.AppendUint16(data, uint16(ndid.ToRowIndex)) | ||
return append(data, ndid.DataNamespace...) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package shwap | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/celestiaorg/celestia-node/share/sharetest" | ||
) | ||
|
||
func TestNamespaceDataID(t *testing.T) { | ||
odsSize := 4 | ||
ns := sharetest.RandV0Namespace() | ||
|
||
id, err := NewNamespaceDataID(1, 1, 2, ns, odsSize*2) | ||
require.NoError(t, err) | ||
|
||
data, err := id.MarshalBinary() | ||
require.NoError(t, err) | ||
|
||
sidOut, err := NamespaceDataIDFromBinary(data) | ||
require.NoError(t, err) | ||
assert.EqualValues(t, id, sidOut) | ||
|
||
err = sidOut.Verify(odsSize * 2) | ||
require.NoError(t, err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package discovery | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p/core/host" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
"github.com/libp2p/go-libp2p/p2p/discovery/backoff" | ||
) | ||
|
||
const ( | ||
// gcInterval is a default period after which disconnected peers will be removed from cache | ||
gcInterval = time.Minute | ||
// connectTimeout is the timeout used for dialing peers and discovering peer addresses. | ||
connectTimeout = time.Minute * 2 | ||
) | ||
|
||
var ( | ||
defaultBackoffFactory = backoff.NewFixedBackoff(time.Minute * 10) | ||
errBackoffNotEnded = errors.New("share/discovery: backoff period has not ended") | ||
) | ||
|
||
// backoffConnector wraps a libp2p.Host to establish a connection with peers | ||
// with adding a delay for the next connection attempt. | ||
type backoffConnector struct { | ||
h host.Host | ||
backoff backoff.BackoffFactory | ||
|
||
cacheLk sync.Mutex | ||
cacheData map[peer.ID]backoffData | ||
} | ||
|
||
// backoffData stores time when next connection attempt with the remote peer. | ||
type backoffData struct { | ||
nexttry time.Time | ||
backoff backoff.BackoffStrategy | ||
} | ||
|
||
func newBackoffConnector(h host.Host, factory backoff.BackoffFactory) *backoffConnector { | ||
return &backoffConnector{ | ||
h: h, | ||
backoff: factory, | ||
cacheData: make(map[peer.ID]backoffData), | ||
} | ||
} | ||
|
||
// Connect puts peer to the backoffCache and tries to establish a connection with it. | ||
func (b *backoffConnector) Connect(ctx context.Context, p peer.AddrInfo) error { | ||
if b.HasBackoff(p.ID) { | ||
return errBackoffNotEnded | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(ctx, connectTimeout) | ||
defer cancel() | ||
|
||
err := b.h.Connect(ctx, p) | ||
// we don't want to add backoff when the context is canceled. | ||
if !errors.Is(err, context.Canceled) { | ||
b.Backoff(p.ID) | ||
} | ||
return err | ||
} | ||
|
||
// Backoff adds or extends backoff delay for the peer. | ||
func (b *backoffConnector) Backoff(p peer.ID) { | ||
b.cacheLk.Lock() | ||
defer b.cacheLk.Unlock() | ||
|
||
data, ok := b.cacheData[p] | ||
if !ok { | ||
data = backoffData{} | ||
data.backoff = b.backoff() | ||
b.cacheData[p] = data | ||
} | ||
|
||
data.nexttry = time.Now().Add(data.backoff.Delay()) | ||
b.cacheData[p] = data | ||
} | ||
|
||
// HasBackoff checks if peer is in backoff. | ||
func (b *backoffConnector) HasBackoff(p peer.ID) bool { | ||
b.cacheLk.Lock() | ||
cache, ok := b.cacheData[p] | ||
b.cacheLk.Unlock() | ||
return ok && time.Now().Before(cache.nexttry) | ||
} | ||
|
||
// GC is a perpetual GCing loop. | ||
func (b *backoffConnector) GC(ctx context.Context) { | ||
ticker := time.NewTicker(gcInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
b.cacheLk.Lock() | ||
for id, cache := range b.cacheData { | ||
if cache.nexttry.Before(time.Now()) { | ||
delete(b.cacheData, id) | ||
} | ||
} | ||
b.cacheLk.Unlock() | ||
} | ||
} | ||
} | ||
|
||
func (b *backoffConnector) Size() int { | ||
b.cacheLk.Lock() | ||
defer b.cacheLk.Unlock() | ||
return len(b.cacheData) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package discovery | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p/core/host" | ||
"github.com/libp2p/go-libp2p/p2p/discovery/backoff" | ||
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestBackoff_ConnectPeer(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) | ||
t.Cleanup(cancel) | ||
m, err := mocknet.FullMeshLinked(2) | ||
require.NoError(t, err) | ||
b := newBackoffConnector(m.Hosts()[0], backoff.NewFixedBackoff(time.Minute)) | ||
info := host.InfoFromHost(m.Hosts()[1]) | ||
require.NoError(t, b.Connect(ctx, *info)) | ||
} | ||
|
||
func TestBackoff_ConnectPeerFails(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) | ||
t.Cleanup(cancel) | ||
m, err := mocknet.FullMeshLinked(2) | ||
require.NoError(t, err) | ||
b := newBackoffConnector(m.Hosts()[0], backoff.NewFixedBackoff(time.Minute)) | ||
info := host.InfoFromHost(m.Hosts()[1]) | ||
require.NoError(t, b.Connect(ctx, *info)) | ||
|
||
require.Error(t, b.Connect(ctx, *info)) | ||
} | ||
|
||
func TestBackoff_ResetBackoffPeriod(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) | ||
t.Cleanup(cancel) | ||
m, err := mocknet.FullMeshLinked(2) | ||
require.NoError(t, err) | ||
b := newBackoffConnector(m.Hosts()[0], backoff.NewFixedBackoff(time.Minute)) | ||
info := host.InfoFromHost(m.Hosts()[1]) | ||
require.NoError(t, b.Connect(ctx, *info)) | ||
nexttry := b.cacheData[info.ID].nexttry | ||
b.Backoff(info.ID) | ||
require.True(t, b.cacheData[info.ID].nexttry.After(nexttry)) | ||
} |
Oops, something went wrong.