Skip to content

Commit

Permalink
Sub multiplexing through Filter API
Browse files Browse the repository at this point in the history
  • Loading branch information
vitvly committed Mar 11, 2024
1 parent 4d828bd commit b3eadd4
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 32 deletions.
134 changes: 134 additions & 0 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package filter

import (
"context"
"sync"
"time"

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)

type FilterConfig struct {
MaxPeers uint
}

type Sub struct {
sync.RWMutex
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
Config FilterConfig
subs subscription.SubscriptionSet
wf *filter.WakuFilterLightNode
ctx context.Context
cancel context.CancelFunc
}

func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig) (*Sub, error) {
sub := new(Sub)
sub.wf = wf
sub.ctx, sub.cancel = context.WithCancel(ctx)
sub.subs = make(subscription.SubscriptionSet)
sub.DataCh = make(chan *protocol.Envelope)
sub.ContentFilter = contentFilter
sub.Config = config

err := sub.subscribe(contentFilter, sub.Config.MaxPeers)

if err == nil {
sub.healthCheckLoop()
return sub, nil
} else {
return nil, err
}
}

func Unsubscribe(apiSub *Sub) error {
apiSub.RLock()
defer apiSub.RUnlock()
for _, s := range apiSub.subs {
apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)

Check failure on line 51 in waku/v2/api/filter.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `apiSub.wf.UnsubscribeWithSubscription` is not checked (errcheck)
}
apiSub.cancel()
return nil
}

func (apiSub *Sub) healthCheckLoop() {
// Health checks
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-apiSub.ctx.Done():
return
case <-ticker.C:
// Returns a map of pubsub topics to peer counts
m := apiSub.checkAliveness()
for t, cnt := range m {
if cnt < apiSub.Config.MaxPeers {
cFilter := protocol.ContentFilter{t, apiSub.ContentFilter.ContentTopics}

Check failure on line 70 in waku/v2/api/filter.go

View workflow job for this annotation

GitHub Actions / lint

composites: github.com/waku-org/go-waku/waku/v2/protocol.ContentFilter struct literal uses unkeyed fields (govet)
apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-cnt)

Check failure on line 71 in waku/v2/api/filter.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `apiSub.subscribe` is not checked (errcheck)
}
}
}
}
}

func (apiSub *Sub) checkAliveness() map[string]uint {
apiSub.RLock()
defer apiSub.RUnlock()
ch := make(chan string, len(apiSub.subs))
wg := &sync.WaitGroup{}
wg.Add(len(apiSub.subs))
for _, subDetails := range apiSub.subs {
go func(subDetails *subscription.SubscriptionDetails) {
defer wg.Done()
err := apiSub.wf.IsSubscriptionAlive(apiSub.ctx, subDetails)

if err != nil {
subDetails.Close()
apiSub.Lock()
defer apiSub.Unlock()
delete(apiSub.subs, subDetails.ID)
} else {
ch <- subDetails.ContentFilter.PubsubTopic
}
}(subDetails)

}
wg.Wait()
close(ch)
// Collect healthy topics
m := make(map[string]uint)
for topic := range ch {
m[topic]++
}

return m

}
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount uint) error {
// Low-level subscribe, returns a set of SubscriptionDetails
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, filter.WithMaxPeersPerContentFilter(int(peerCount)))
if err != nil {
// TODO what if fails?
return err
}
apiSub.Lock()
defer apiSub.Unlock()
for _, s := range subs {
apiSub.subs[s.ID] = s
}
// Multiplex onto single channel
// Goroutines will exit once sub channels are closed
for _, subDetails := range subs {
go func(subDetails *subscription.SubscriptionDetails) {
for env := range subDetails.C {
apiSub.DataCh <- env
}
}(subDetails)
}
return nil

}
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
Protocols: protocols,
Connected: connected,
Addrs: addrs,
PubsubTopics: topics,
PubsubTopics: maps.Keys(topics),
})
}
return peers, nil
Expand Down
3 changes: 2 additions & 1 deletion waku/v2/peermanager/topic_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error {
Expand Down Expand Up @@ -102,7 +103,7 @@ func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) {
logging.HostID("peerID", peer))
continue
}
if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic {
if len(peerTopics) == 1 && maps.Keys(peerTopics)[0] == pubsubTopic {
err := pm.host.Network().ClosePeer(peer)
if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer",
Expand Down
50 changes: 20 additions & 30 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/maps"
)

// Origin is used to determine how the peer is identified,
Expand Down Expand Up @@ -58,7 +60,7 @@ type WakuPeerstore interface {

AddPubSubTopic(p peer.ID, topic string) error
RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error)
PubSubTopics(p peer.ID) (protocol.TopicSet, error)
SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice
PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
Expand Down Expand Up @@ -175,13 +177,12 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error {
if err != nil {
return err
}
for _, t := range existingTopics {
if t == topic {
return nil
}

if _, found := existingTopics[topic]; found {
return nil
}
existingTopics = append(existingTopics, topic)
return ps.peerStore.Put(p, peerPubSubTopics, existingTopics)
existingTopics[topic] = struct{}{}
return ps.peerStore.Put(p, peerPubSubTopics, maps.Keys(existingTopics))
}

// RemovePubSubTopic removes a pubSubTopic from the peer
Expand All @@ -195,14 +196,9 @@ func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error {
return nil
}

for i := range existingTopics {
if existingTopics[i] == topic {
existingTopics = append(existingTopics[:i], existingTopics[i+1:]...)
break
}
}
delete(existingTopics, topic)

err = ps.SetPubSubTopics(p, existingTopics)
err = ps.SetPubSubTopics(p, maps.Keys(existingTopics))
if err != nil {
return err
}
Expand All @@ -215,16 +211,16 @@ func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error {
}

// PubSubTopics fetches list of pubSubTopics for a peer
func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) {
func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) (protocol.TopicSet, error) {
result, err := ps.peerStore.Get(p, peerPubSubTopics)
if err != nil {
if errors.Is(err, peerstore.ErrNotFound) {
return nil, nil
return protocol.NewTopicSet(), nil
} else {
return nil, err
}
}
return result.([]string), nil
return protocol.NewTopicSet((result.([]string))...), nil
}

// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics
Expand All @@ -235,22 +231,16 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific
}
var result peer.IDSlice
for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p)
peerMatch := true
peerTopics, err := ps.PubSubTopics(p)
if err == nil {
//Convoluted and crazy logic to find subset of topics
// Could not find a better way to do it?
peerTopicMap := make(map[string]struct{})
match := true
for _, topic := range topics {
peerTopicMap[topic] = struct{}{}
}
for _, topic := range pubSubTopics {
if _, ok := peerTopicMap[topic]; !ok {
match = false
for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
}
if match {
if peerMatch {
result = append(result, p)
}
} //Note: skipping a peer in case of an error as there would be others available.
Expand All @@ -268,7 +258,7 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeer
for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p)
if err == nil {
for _, topic := range topics {
for topic := range topics {
if topic == pubSubTopic {
result = append(result, p)
}
Expand Down
9 changes: 9 additions & 0 deletions waku/v2/protocol/content_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ type PubsubTopicStr = string
type ContentTopicStr = string

type ContentTopicSet map[string]struct{}
type TopicSet map[string]struct{}

func NewTopicSet(topics ...string) TopicSet {
s := make(TopicSet, len(topics))
for _, t := range topics {
s[t] = struct{}{}
}
return s
}

func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
Expand Down

0 comments on commit b3eadd4

Please sign in to comment.