-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadcaster.go
314 lines (298 loc) · 9.38 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
package cassette
import (
"context"
"math"
"time"
"github.com/ipfs/boxo/bitswap/message"
bitswap_message_pb "github.com/ipfs/boxo/bitswap/message/pb"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/mercari/go-circuitbreaker"
)
var bitswapOneTwo protocol.ID = "/ipfs/bitswap/1.2.0"
type (
broadcaster struct {
mailbox chan any
refreshTicker *time.Ticker
c *Cassette
}
addRecipient struct {
id peer.ID
}
removeRecipient struct {
id peer.ID
}
findCids struct {
cids []cid.Cid
timestamp time.Time
}
channeledSender struct {
ctx context.Context
cancel context.CancelFunc
id peer.ID
mailbox chan findCids
c *Cassette
unsentTimestamp time.Time
// unsentCids maps the CIDs that are not sent yet to whether they should be cancelled or not.
unsentCids map[cid.Cid]bool
maxBatchSize int
maxBatchWait *time.Ticker
cancelAfter *time.Ticker
sendTimeout time.Duration
cb *circuitbreaker.CircuitBreaker
}
)
func newBroadcaster(c *Cassette) *broadcaster {
return &broadcaster{
c: c,
mailbox: make(chan any, c.broadcastChannelBuffer),
refreshTicker: time.NewTicker(c.recipientsRefreshInterval),
}
}
func (b *broadcaster) start(ctx context.Context) {
go func() {
refresh := func() {
logger.Info("Refreshing broadcast list...")
peers := b.c.h.Peerstore().Peers()
var count int
for _, id := range peers {
select {
case <-ctx.Done():
logger.Info("Refresh disrupted")
return
default:
if id != b.c.h.ID() {
b.mailbox <- addRecipient{
id: id,
}
count++
}
}
}
logger.Infow("Broadcast list refreshed", "size", count)
}
refresh()
for {
select {
case <-ctx.Done():
logger.Info("Stopping broadcast recipient refresh")
return
case <-b.refreshTicker.C:
refresh()
}
}
}()
go func() {
recipients := make(map[peer.ID]*channeledSender)
defer func() {
logger.Infow("Stopping broadcast...")
for id, sender := range recipients {
sender.shutdown()
delete(recipients, id)
logger.Infow("Stopped broadcast to peer", "peer", id)
}
logger.Infow("Broadcasting stopped.")
}()
for {
select {
case <-ctx.Done():
return
case cmd := <-b.mailbox:
switch c := cmd.(type) {
case findCids:
for _, recipient := range recipients {
select {
case <-ctx.Done():
return
case recipient.mailbox <- c:
b.c.metrics.notifyBroadcastRequested(context.Background(), int64(len(c.cids)))
}
}
case addRecipient:
if _, exists := recipients[c.id]; exists {
continue
}
cs := b.newChanneledSender(c.id)
go cs.start()
recipients[c.id] = cs
b.c.metrics.notifyBroadcastRecipientAdded(context.Background())
case removeRecipient:
if cs, exists := recipients[c.id]; exists {
cs.shutdown()
delete(recipients, c.id)
b.c.metrics.notifyBroadcastRecipientRemoved(context.Background())
}
}
}
}
}()
}
func (cs *channeledSender) start() {
cancellations := make(map[cid.Cid]struct{})
for {
select {
case <-cs.ctx.Done():
return
case fc, ok := <-cs.mailbox:
if !ok {
return
}
if cs.unsentTimestamp.IsZero() || cs.unsentTimestamp.After(fc.timestamp) {
cs.unsentTimestamp = fc.timestamp
}
for _, c := range fc.cids {
if _, exists := cs.unsentCids[c]; !exists {
// Add as unsent with cancel set to false.
cs.unsentCids[c] = false
// Add to cancellations to be marked for cancellation later.
cancellations[c] = struct{}{}
}
}
if len(cs.unsentCids) >= cs.maxBatchSize {
cs.sendUnsent()
}
case <-cs.maxBatchWait.C:
if len(cs.unsentCids) != 0 {
cs.sendUnsent()
}
case <-cs.cancelAfter.C:
if len(cancellations) != 0 {
for c := range cancellations {
// When the CID has been broadcasted it gets removed from the unsent CIDs.
// Therefore, only cancel CIDs that are no longer present in the unsent CIDs.
// This won't work for CIDs that are repeatedly looked up in a short period of
// time. However, Cassette typically sits behind a caching layer that should
// cover this edge case.
// TODO: add caching inside Cassette to make sure this is covered regardless of
// deployment topology.
if _, ok := cs.unsentCids[c]; !ok {
cs.unsentCids[c] = true
}
delete(cancellations, c)
}
}
}
}
}
func (cs *channeledSender) supportsHaves(ctx context.Context) bool {
// Assure connection to peer before checking protocols list. Otherwise, GetProtocols
// silently returns empty protocols list.
if addrs := cs.c.h.Peerstore().Addrs(cs.id); len(addrs) == 0 {
return false
} else if err := cs.c.h.Connect(ctx, peer.AddrInfo{ID: cs.id, Addrs: addrs}); err != nil {
logger.Errorw("Failed to connect to peer in order to determine Want-Haves support", "peer", cs.id, "err", err)
return false
}
protocols, err := cs.c.h.Peerstore().GetProtocols(cs.id)
if err != nil {
return false
}
for _, p := range protocols {
if p == bitswapOneTwo {
return true
}
}
return false
}
func (cs *channeledSender) shutdown() {
cs.cancel()
cs.maxBatchWait.Stop()
close(cs.mailbox)
}
func (cs *channeledSender) sendUnsent() {
ctx, cancel := context.WithTimeout(cs.ctx, cs.sendTimeout)
defer func() {
cancel()
cs.unsentTimestamp = time.Now()
}()
totalCidCount := int64(len(cs.unsentCids))
if !cs.cb.Ready() {
// Clear unsent CIDs since it will most likely take long enough for the node to become ready that
// lookup requests have already timed out.
cs.unsentCids = make(map[cid.Cid]bool)
cs.c.metrics.notifyBroadcastFailed(context.Background(), totalCidCount, circuitbreaker.ErrOpen, time.Since(cs.unsentTimestamp))
return
}
var wantHave bool
var wlt bitswap_message_pb.Message_Wantlist_WantType
if cs.supportsHaves(ctx) {
wlt = bitswap_message_pb.Message_Wantlist_Have
wantHave = true
} else if cs.c.fallbackOnWantBlock {
wlt = bitswap_message_pb.Message_Wantlist_Block
} else {
logger.Warnw("Peer does not support Want-Haves and fallback on Want-Blocks is disabled. Skipping broadcast.", "peer", cs.id, "skipped", len(cs.unsentCids))
// Clear unsent CIDs.
cs.unsentCids = make(map[cid.Cid]bool)
cs.c.metrics.notifyBroadcastSkipped(context.Background(), totalCidCount, time.Since(cs.unsentTimestamp))
// Fail the Circuit Breaker to avoid checking whether the recipient supports Want-Haves on every send, since
// it is a characteristic that does not change often.
cs.cb.Fail()
return
}
var cancelCount int64
msg := message.New(false)
for c, cancel := range cs.unsentCids {
if cancel {
msg.Cancel(c)
cancelCount++
} else {
msg.AddEntry(c, math.MaxInt32, wlt, false)
}
delete(cs.unsentCids, c)
}
if err := cs.c.bsn.SendMessage(ctx, cs.id, msg); err != nil {
logger.Errorw("Failed to send message", "to", cs.id, "err", err)
cs.c.metrics.notifyBroadcastFailed(context.Background(), totalCidCount, err, time.Since(cs.unsentTimestamp))
// SendMessage wraps the context internally with configured bitswap timeouts.
// Therefore, calling FailWithContext on Circuit Breaker will have no effect if deadline exceeds and would fail
// the Circuit Breaker regardless of what the WithFailOnContextDeadline option is set to.
// This is fine since we do want it to fail.
// Regardless, do call FailWithContext so that the WithFailOnContextCancel is respected.
cs.cb.FailWithContext(ctx)
} else {
cs.c.metrics.notifyBroadcastSucceeded(context.Background(), totalCidCount, cancelCount, wantHave, time.Since(cs.unsentTimestamp))
cs.cb.Success()
}
}
func (b *broadcaster) newChanneledSender(id peer.ID) *channeledSender {
cs := channeledSender{
id: id,
mailbox: make(chan findCids, b.c.broadcastSendChannelBuffer),
c: b.c,
unsentCids: make(map[cid.Cid]bool),
maxBatchSize: b.c.maxBroadcastBatchSize,
maxBatchWait: time.NewTicker(b.c.maxBroadcastBatchWait),
cancelAfter: time.NewTicker(b.c.broadcastCancelAfter),
sendTimeout: b.c.recipientSendTimeout,
}
cs.ctx, cs.cancel = context.WithCancel(context.Background())
cs.cb = circuitbreaker.New(
circuitbreaker.WithFailOnContextCancel(b.c.recipientCBFailOnContextCancel),
circuitbreaker.WithFailOnContextDeadline(b.c.recipientCBFailOnContextDeadline),
circuitbreaker.WithHalfOpenMaxSuccesses(b.c.recipientCBHalfOpenMaxSuccesses),
circuitbreaker.WithOpenTimeoutBackOff(b.c.recipientCBOpenTimeoutBackOff),
circuitbreaker.WithOpenTimeout(b.c.recipientCBOpenTimeout),
circuitbreaker.WithCounterResetInterval(b.c.recipientCBCounterResetInterval),
circuitbreaker.WithTripFunc(b.c.recipientCBTripFunc),
circuitbreaker.WithOnStateChangeHookFn(func(from, to circuitbreaker.State) {
b.c.metrics.notifyBroadcastRecipientCBStateChanged(context.Background(), id, from, to)
logger.Infow("Broadcast recipient circuit breaker state changed", "recipient", id, "from", from, "to", to)
switch to {
case circuitbreaker.StateOpen:
b.mailbox <- removeRecipient{id: id}
logger.Warnw("Removing recipient with opened circuit breaker", "recipient", id, "from", from, "to", to)
}
}),
)
return &cs
}
func (b *broadcaster) broadcastWant(ctx context.Context, c []cid.Cid) error {
select {
case <-ctx.Done():
return ctx.Err()
case b.mailbox <- findCids{cids: c, timestamp: time.Now()}:
return nil
}
}