Skip to content

Commit

Permalink
batch process dead peer notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Jul 16, 2021
1 parent d5437b9 commit 4c1432a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 43 deletions.
35 changes: 22 additions & 13 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,31 @@ func (p *PubSub) handleNewStream(s network.Stream) {
}
}

func (p *PubSub) notifyPeerDead(pid peer.ID) {
p.peerDeadPrioLk.RLock()
p.peerDeadMx.Lock()
p.peerDeadPend[pid] = struct{}{}
p.peerDeadMx.Unlock()
p.peerDeadPrioLk.RUnlock()

select {
case p.peerDead <- struct{}{}:
default:
}
}

func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)

var ch chan peer.ID
if err == ms.ErrNotSupported {
ch = p.newPeerError
select {
case p.newPeerError <- pid:
case <-ctx.Done():
}
} else {
ch = p.peerDead
}

select {
case ch <- pid:
case <-ctx.Done():
p.notifyPeerDead(pid)
}
return
}
Expand All @@ -116,18 +126,17 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
pid := s.Conn().RemotePeer()
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
if err != nil {
select {
case p.peerDead <- s.Conn().RemotePeer():
case <-ctx.Done():
}
p.notifyPeerDead(pid)
return
}
log.Debugf("unexpected message from %s", s.Conn().RemotePeer())

log.Debugf("unexpected message from %s", pid)
}
}

Expand Down
80 changes: 50 additions & 30 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ type PubSub struct {
newPeerError chan peer.ID

// a notification channel for when our peers die
peerDead chan peer.ID
peerDead chan struct{}
peerDeadPrioLk sync.RWMutex
peerDeadMx sync.Mutex
peerDeadPend map[peer.ID]struct{}

// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}
Expand Down Expand Up @@ -242,7 +245,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
newPeersPend: make(map[peer.ID]struct{}),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
peerDead: make(chan struct{}, 1),
peerDeadPend: make(map[peer.ID]struct{}),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
Expand Down Expand Up @@ -512,34 +516,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
case pid := <-p.newPeerError:
delete(p.peers, pid)

case pid := <-p.peerDead:
ch, ok := p.peers[pid]
if !ok {
continue
}

close(ch)

if p.host.Network().Connectedness(pid) == network.Connected {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Warn("peer declared dead but still connected; respawning writer: ", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages
continue
}

delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}

p.rt.RemovePeer(pid)
case <-p.peerDead:
p.handleDeadPeers()

case treq := <-p.getTopics:
var out []string
Expand Down Expand Up @@ -652,6 +630,48 @@ func (p *PubSub) handlePendingPeers() {
}
}

func (p *PubSub) handleDeadPeers() {
p.peerDeadPrioLk.Lock()
defer p.peerDeadPrioLk.Unlock()

if len(p.peerDeadPend) == 0 {
return
}

deadPeers := p.peerDeadPend
p.peerDeadPend = make(map[peer.ID]struct{})

for pid := range deadPeers {
ch, ok := p.peers[pid]
if !ok {
continue
}

close(ch)

if p.host.Network().Connectedness(pid) == network.Connected {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(p.ctx, pid, messages)
p.peers[pid] = messages
continue
}

delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}

p.rt.RemovePeer(pid)
}
}

// handleAddTopic adds a tracker for a particular topic.
// Only called from processLoop.
func (p *PubSub) handleAddTopic(req *addTopicReq) {
Expand Down

0 comments on commit 4c1432a

Please sign in to comment.