Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add subjective init opts #37

Closed
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ require (
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.8.0 // indirect
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m *Store[H]) Height() uint64 {
return uint64(m.HeadHeight)
}

func (m *Store[H]) Head(context.Context) (H, error) {
func (m *Store[H]) Head(context.Context, ...header.Option) (H, error) {
return m.Headers[m.HeadHeight], nil
}

Expand Down
16 changes: 15 additions & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,19 @@ type Getter[H Header] interface {
// reporting it.
type Head[H Header] interface {
// Head returns the latest known header.
Head(context.Context) (H, error)
Head(context.Context, ...Option) (H, error)
}

type Option func(*RequestOptions)

type RequestOptions struct {
// subjective initialization just means that the node has to really trust
// the sync target that it sets in order to prevent a long range attack
SubjectiveInit bool
}

func WithSubjectiveInit(subjInit bool) Option {
return func(o *RequestOptions) {
o.SubjectiveInit = subjInit
}
}
2 changes: 1 addition & 1 deletion local/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (l *Exchange[H]) Stop(context.Context) error {
return nil
}

func (l *Exchange[H]) Head(ctx context.Context) (H, error) {
func (l *Exchange[H]) Head(ctx context.Context, opts ...header.Option) (H, error) {
return l.store.Head(ctx)
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func NewExchange[H header.Header](
peerTracker: newPeerTracker(
host,
connGater,
params.peerstore,
),
Params: params,
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// The Head must be verified thereafter where possible.
// We request in parallel all the trusted peers, compare their response
// and return the highest one.
func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.Option) (H, error) {
log.Debug("requesting head")

reqCtx := ctx
Expand Down
25 changes: 21 additions & 4 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/celestiaorg/go-header/headertest"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"

pstore "github.com/celestiaorg/go-header/p2p/peerstore"
)

const networkID = "private"
Expand Down Expand Up @@ -50,7 +52,7 @@ func TestExchange_RequestHead_UnresponsivePeer(t *testing.T) {
goodStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5)
_ = server(ctx, t, hosts[1], goodStore)

badStore := &timedOutStore{timeout: time.Millisecond*500} // simulates peer that does not respond
badStore := &timedOutStore{timeout: time.Millisecond * 500} // simulates peer that does not respond
_ = server(ctx, t, hosts[2], badStore)

ctx, cancel = context.WithTimeout(ctx, time.Millisecond*500)
Expand Down Expand Up @@ -447,6 +449,7 @@ func createMocknet(t *testing.T, amount int) []libhost.Host {
func createP2PExAndServer(
t *testing.T,
host, tpeer libhost.Host,
goodPeers ...libhost.Host,
) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) {
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store,
Expand All @@ -457,9 +460,16 @@ func createP2PExAndServer(
require.NoError(t, err)
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
peers := []peer.AddrInfo{tpeer.Peerstore().PeerInfo(tpeer.ID())}
for _, p := range goodPeers {
peers = append(peers, p.Peerstore().PeerInfo(p.ID()))
}
mockPeerstore := pstore.NewPeerStore(sync.MutexWrap(datastore.NewMapDatastore()))
mockPeerstore.Put(context.Background(), peers)
ex, err := NewExchange[*headertest.DummyHeader](host, []peer.ID{tpeer.ID()}, connGater,
WithNetworkID[ClientParameters](networkID),
WithChainID(networkID),
WithPeerPersistence[ClientParameters](mockPeerstore),
)
require.NoError(t, err)
require.NoError(t, ex.Start(context.Background()))
Expand Down Expand Up @@ -488,8 +498,15 @@ func quicHosts(t *testing.T, n int) []libhost.Host {
return hosts
}

func client(ctx context.Context, t *testing.T, host libhost.Host, trusted []peer.ID) *Exchange[*headertest.DummyHeader] {
client, err := NewExchange[*headertest.DummyHeader](host, trusted, nil)
func client(ctx context.Context, t *testing.T, host libhost.Host, trusted []peer.ID, goodPeers ...libhost.Host) *Exchange[*headertest.DummyHeader] {
peers := []peer.AddrInfo{}
for _, p := range goodPeers {
peers = append(peers, p.Peerstore().PeerInfo(p.ID()))
}
mockPeerstore := pstore.NewPeerStore(sync.MutexWrap(datastore.NewMapDatastore()))
mockPeerstore.Put(context.Background(), peers)
client, err := NewExchange[*headertest.DummyHeader](
host, trusted, nil, WithPeerPersistence[ClientParameters](mockPeerstore))
require.NoError(t, err)
err = client.Start(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -522,7 +539,7 @@ func (t *timedOutStore) HasAt(_ context.Context, _ uint64) bool {
return true
}

func (t *timedOutStore) Head(_ context.Context) (*headertest.DummyHeader, error) {
func (t *timedOutStore) Head(_ context.Context, opts ...header.Option) (*headertest.DummyHeader, error) {
time.Sleep(t.timeout)
return nil, header.ErrNoHead
}
15 changes: 15 additions & 0 deletions p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package p2p
import (
"fmt"
"time"

"github.com/celestiaorg/go-header/p2p/peerstore"
)

// parameters is an interface that encompasses all params needed for
Expand Down Expand Up @@ -119,6 +121,8 @@ type ClientParameters struct {
networkID string
// chainID is an identifier of the chain.
chainID string
// peerstore is a storage for peers.
peerstore peerstore.Peerstore
}

// DefaultClientParameters returns the default params to configure the store.
Expand Down Expand Up @@ -168,3 +172,14 @@ func WithChainID[T ClientParameters](chainID string) Option[T] {
}
}
}

// WithPeerPersistence is a functional option that configures the
// `peerstore` parameter.
func WithPeerPersistence[T ClientParameters](pstore peerstore.Peerstore) Option[T] {
return func(p *T) {
switch t := any(p).(type) { //nolint:gocritic
case *ClientParameters:
t.peerstore = pstore
}
}
}
28 changes: 26 additions & 2 deletions p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"

"github.com/celestiaorg/go-header/p2p/peerstore"
)

const (
Expand All @@ -24,7 +26,7 @@ var (
// otherwise it will be removed on the next GC cycle.
maxAwaitingTime = time.Hour
// gcCycle defines the duration after which the peerTracker starts removing peers.
gcCycle = time.Minute * 30
gcCycleDefault = time.Minute * 1
)

type peerTracker struct {
Expand All @@ -40,6 +42,9 @@ type peerTracker struct {
// online until pruneDeadline, it will be removed and its score will be lost.
disconnectedPeers map[peer.ID]*peerStat

// peerstore is used to store peers periodically.
peerstore peerstore.Peerstore

ctx context.Context
cancel context.CancelFunc
// done is used to gracefully stop the peerTracker.
Expand All @@ -50,13 +55,15 @@ type peerTracker struct {
func newPeerTracker(
h host.Host,
connGater *conngater.BasicConnectionGater,
peerstore peerstore.Peerstore,
) *peerTracker {
ctx, cancel := context.WithCancel(context.Background())
return &peerTracker{
host: h,
connGater: connGater,
disconnectedPeers: make(map[peer.ID]*peerStat),
trackedPeers: make(map[peer.ID]*peerStat),
peerstore: peerstore,
ctx: ctx,
cancel: cancel,
done: make(chan struct{}, 2),
Expand Down Expand Up @@ -159,7 +166,7 @@ func (p *peerTracker) peers() []*peerStat {
// * disconnected peers which have been disconnected for more than maxAwaitingTime;
// * connected peers whose scores are less than or equal than defaultScore;
func (p *peerTracker) gc() {
ticker := time.NewTicker(gcCycle)
ticker := time.NewTicker(gcCycleDefault)
for {
select {
case <-p.ctx.Done():
Expand All @@ -179,7 +186,24 @@ func (p *peerTracker) gc() {
delete(p.trackedPeers, id)
}
}

trackedPeersCopy := make(map[peer.ID]struct{}, len(p.trackedPeers))
for id := range p.trackedPeers {
trackedPeersCopy[id] = struct{}{}
}
p.peerLk.Unlock()

peerlist := make([]peer.AddrInfo, 0, len(trackedPeersCopy))
for peerID := range trackedPeersCopy {
addrInfo := p.host.Peerstore().PeerInfo(peerID)
peerlist = append(peerlist, addrInfo)
}
if p.peerstore != nil {
err := p.peerstore.Put(p.ctx, peerlist)
if err != nil {
log.Errorf("Failed to persist updated peer list: $w", err)
}
}
}
}
}
Expand Down
56 changes: 43 additions & 13 deletions p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,77 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-header/p2p/peerstore"
)

func TestPeerTracker_GC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer t.Cleanup(cancel)

gcCycleDefault = time.Millisecond * 200
maxAwaitingTime = time.Millisecond

h := createMocknet(t, 1)
gcCycle = time.Millisecond * 200
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
p := newPeerTracker(h[0], connGater)
maxAwaitingTime = time.Millisecond
pid1 := peer.ID("peer1")
pid2 := peer.ID("peer2")
pid3 := peer.ID("peer3")
pid4 := peer.ID("peer4")

mockPeerStore := peerstore.NewPeerStore(sync.MutexWrap(datastore.NewMapDatastore()))
p := newPeerTracker(h[0], connGater, mockPeerStore)

peerlist, err := peerstore.GenerateRandomPeerlist(4)
require.NoError(t, err)

pid1 := peerlist[0].ID
pid2 := peerlist[1].ID
pid3 := peerlist[2].ID
pid4 := peerlist[3].ID

// Add peer with low score to test if it will be GC'ed (it should)
p.trackedPeers[pid1] = &peerStat{peerID: pid1, peerScore: 0.5}
// Add peer with high score to test if it won't be GCed (it shouldn't)
p.trackedPeers[pid2] = &peerStat{peerID: pid2, peerScore: 10}

// Add peer such that their prune deadlnie is in the past (after GC cycle time has passed)
// to test if they will be prned (they should)
p.disconnectedPeers[pid3] = &peerStat{peerID: pid3, pruneDeadline: time.Now()}
p.disconnectedPeers[pid4] = &peerStat{peerID: pid4, pruneDeadline: time.Now().Add(time.Minute * 10)}
assert.True(t, len(p.trackedPeers) > 0)
assert.True(t, len(p.disconnectedPeers) > 0)
// Add peer such that their prune deadline is not the past (after GC cycle time has passed)
// to test if they won't be pruned (they shouldn't)
p.disconnectedPeers[pid4] = &peerStat{peerID: pid4, pruneDeadline: time.Now().Add(time.Millisecond * 300)}

go p.track()
go p.gc()

time.Sleep(time.Second * 1)
<-time.After(gcCycleDefault + time.Millisecond*20)

p.peerLk.Lock()
assert.True(t, len(p.trackedPeers) > 0)
assert.True(t, len(p.disconnectedPeers) > 0)
p.peerLk.Unlock()

err = p.stop(context.Background())
require.NoError(t, err)

require.Nil(t, p.trackedPeers[pid1])
require.Nil(t, p.disconnectedPeers[pid3])

assert.Equal(t, pid2, p.trackedPeers[pid2].peerID)

peers, err := mockPeerStore.Load(ctx)
require.NoError(t, err)

assert.Equal(t, peers[0].ID, p.trackedPeers[pid2].peerID)
assert.Equal(t, 1, len(p.trackedPeers))
}

func TestPeerTracker_BlockPeer(t *testing.T) {
h := createMocknet(t, 2)
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
p := newPeerTracker(h[0], connGater)
p := newPeerTracker(h[0], connGater, nil)
maxAwaitingTime = time.Millisecond
p.blockPeer(h[1].ID(), errors.New("test"))
require.Len(t, connGater.ListBlockedPeers(), 1)
Expand Down
13 changes: 13 additions & 0 deletions p2p/peerstore/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package peerstore

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"
)

// Peerstore is an interface for storing and loading libp2p peers' information.
type Peerstore interface {
Put(context.Context, []peer.AddrInfo) error
Load(context.Context) ([]peer.AddrInfo, error)
}
Loading