forked from lightninglabs/neutrino
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblockmanager.go
2403 lines (2068 loc) · 72.6 KB
/
blockmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// NOTE: THIS API IS UNSTABLE RIGHT NOW AND WILL GO MOSTLY PRIVATE SOON.
package neutrino
import (
"bytes"
"container/list"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
"github.com/lightninglabs/neutrino/headerfs"
"github.com/lightninglabs/neutrino/headerlist"
)
const (
// maxTimeOffset is the maximum duration a block time is allowed to be
// ahead of the curent time. This is currently 2 hours.
maxTimeOffset = 2 * time.Hour
// numMaxMemHeaders is the max number of headers to store in memory for
// a particular peer. By bounding this value, we're able to closely
// control our effective memory usage during initial sync and re-org
// handling. This value should be set a "sane" re-org size, such that
// we're able to properly handle re-orgs in size strictly less than
// this value.
numMaxMemHeaders = 10000
)
// filterStoreLookup
type filterStoreLookup func(*ChainService) *headerfs.FilterHeaderStore
var (
// filterTypes is a map of filter types to synchronize to a lookup
// function for the service's store for that filter type.
filterTypes = map[wire.FilterType]filterStoreLookup{
wire.GCSFilterRegular: func(
s *ChainService) *headerfs.FilterHeaderStore {
return s.RegFilterHeaders
},
}
)
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash chainhash.Hash
// newPeerMsg signifies a newly connected peer to the block handler.
type newPeerMsg struct {
peer *ServerPeer
}
// invMsg packages a bitcoin inv message and the peer it came from together
// so the block handler has access to that information.
type invMsg struct {
inv *wire.MsgInv
peer *ServerPeer
}
// headersMsg packages a bitcoin headers message and the peer it came from
// together so the block handler has access to that information.
type headersMsg struct {
headers *wire.MsgHeaders
peer *ServerPeer
}
// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *ServerPeer
}
// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
tx *btcutil.Tx
peer *ServerPeer
}
// blockManager provides a concurrency safe block manager for handling all
// incoming blocks.
type blockManager struct {
started int32
shutdown int32
// blkHeaderProgressLogger is a progress logger that we'll use to
// update the number of blocker headers we've processed in the past 10
// seconds within the log.
blkHeaderProgressLogger *headerProgressLogger
// fltrHeaderProgessLogger is a process logger similar to the one
// above, but we'll use it to update the progress of the set of filter
// headers that we've verified in the past 10 seconds.
fltrHeaderProgessLogger *headerProgressLogger
// headerTip will be set to the current block header tip at all times.
// Callers MUST hold the lock below each time they read/write from
// this field.
headerTip uint32
// headerTipHash will be set to the hash of the current block header
// tip at all times. Callers MUST hold the lock below each time they
// read/write from this field.
headerTipHash chainhash.Hash
// newHeadersMtx is the mutex that should be held when reading/writing
// the headerTip variable above.
newHeadersMtx sync.RWMutex
// newHeadersSignal is condition variable which will be used to notify
// any waiting callers (via Broadcast()) that the tip of the current
// chain has changed. This is useful when callers need to know we have
// a new tip, but not necessarily each block that was connected during
// switch over.
newHeadersSignal *sync.Cond
// filterHeaderTip will be set to the current filter header tip at all
// times. Callers MUST hold the lock below each time they read/write
// from this field.
filterHeaderTip uint32
// filterHeaderTipHash will be set to the current block hash of the
// fitler header tip at all times. Callers MUST hold the lock below
// each time they read/write from this field.
filterHeaderTipHash chainhash.Hash
// newFilterHeadersMtx is the mutex that should be held when
// reading/writing the filterHeaderTip variable above.
newFilterHeadersMtx sync.RWMutex
// newFilterHeadersSignal is condition variable which will be used to
// notify any waiting callers (via Broadcast()) that the tip of the
// current filter header chain has changed. This is useful when callers
// need to know we have a new tip, but not necessarily each filter
// header that was connected during switch over.
newFilterHeadersSignal *sync.Cond
// syncPeer points to the peer that we're currently syncing block
// headers from.
syncPeer *ServerPeer
// syncPeerMutex protects the above syncPeer pointer at all times.
syncPeerMutex sync.RWMutex
// server is a pointer to the main p2p server for Neutrino, we'll use
// this pointer at times to do things like access the database, etc
server *ChainService
// peerChan is a channel for messages that come from peers
peerChan chan interface{}
wg sync.WaitGroup
quit chan struct{}
headerList headerlist.Chain
reorgList headerlist.Chain
startHeader *headerlist.Node
nextCheckpoint *chaincfg.Checkpoint
lastRequested chainhash.Hash
minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor
blocksPerRetarget int32 // target timespan / target time per block
}
// newBlockManager returns a new bitcoin block manager. Use Start to begin
// processing asynchronous block and inv updates.
func newBlockManager(s *ChainService) (*blockManager, error) {
targetTimespan := int64(s.chainParams.TargetTimespan / time.Second)
targetTimePerBlock := int64(s.chainParams.TargetTimePerBlock / time.Second)
adjustmentFactor := s.chainParams.RetargetAdjustmentFactor
bm := blockManager{
server: s,
peerChan: make(chan interface{}, MaxPeers*3),
blkHeaderProgressLogger: newBlockProgressLogger(
"Processed", "block", log,
),
fltrHeaderProgessLogger: newBlockProgressLogger(
"Verified", "filter header", log,
),
headerList: headerlist.NewBoundedMemoryChain(
numMaxMemHeaders,
),
reorgList: headerlist.NewBoundedMemoryChain(
numMaxMemHeaders,
),
quit: make(chan struct{}),
blocksPerRetarget: int32(targetTimespan / targetTimePerBlock),
minRetargetTimespan: targetTimespan / adjustmentFactor,
maxRetargetTimespan: targetTimespan * adjustmentFactor,
}
// Next we'll create the two signals that goroutines will use to wait
// on a particular header chain height before starting their normal
// duties.
bm.newHeadersSignal = sync.NewCond(&bm.newHeadersMtx)
bm.newFilterHeadersSignal = sync.NewCond(&bm.newFilterHeadersMtx)
// Initialize the next checkpoint based on the current height.
header, height, err := s.BlockHeaders.ChainTip()
if err != nil {
return nil, err
}
bm.nextCheckpoint = bm.findNextHeaderCheckpoint(int32(height))
bm.headerList.ResetHeaderState(headerlist.Node{
Header: *header,
Height: int32(height),
})
bm.headerTip = height
bm.headerTipHash = header.BlockHash()
// Finally, we'll set the filter header tip so any goroutines waiting
// on the condition obtain the correct initial state.
_, bm.filterHeaderTip, err = s.RegFilterHeaders.ChainTip()
if err != nil {
return nil, err
}
bm.filterHeaderTipHash = header.BlockHash()
return &bm, nil
}
// Start begins the core block handler which processes block and inv messages.
func (b *blockManager) Start() {
// Already started?
if atomic.AddInt32(&b.started, 1) != 1 {
return
}
log.Trace("Starting block manager")
b.wg.Add(2)
go b.blockHandler()
go b.cfHandler()
}
// Stop gracefully shuts down the block manager by stopping all asynchronous
// handlers and waiting for them to finish.
func (b *blockManager) Stop() error {
if atomic.AddInt32(&b.shutdown, 1) != 1 {
log.Warnf("Block manager is already in the process of " +
"shutting down")
return nil
}
// We'll send out update signals before the quit to ensure that any
// goroutines waiting on them will properly exit.
done := make(chan struct{})
go func() {
ticker := time.NewTicker(time.Millisecond * 50)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
}
b.newHeadersSignal.Broadcast()
b.newFilterHeadersSignal.Broadcast()
}
}()
log.Infof("Block manager shutting down")
close(b.quit)
b.wg.Wait()
close(done)
return nil
}
// NewPeer informs the block manager of a newly active peer.
func (b *blockManager) NewPeer(sp *ServerPeer) {
// Ignore if we are shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
select {
case b.peerChan <- &newPeerMsg{peer: sp}:
case <-b.quit:
return
}
}
// handleNewPeerMsg deals with new peers that have signalled they may be
// considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler
// goroutine.
func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *ServerPeer) {
// Ignore if in the process of shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
log.Infof("New valid peer %s (%s)", sp, sp.UserAgent())
// Ignore the peer if it's not a sync candidate.
if !b.isSyncCandidate(sp) {
return
}
// Add the peer as a candidate to sync from.
peers.PushBack(sp)
// If we're current with our sync peer and the new peer is advertising
// a higher block than the newest one we know of, request headers from
// the new peer.
_, height, err := b.server.BlockHeaders.ChainTip()
if err != nil {
log.Criticalf("Couldn't retrieve block header chain tip: %s",
err)
return
}
if b.IsCurrent() && height < uint32(sp.StartingHeight()) {
locator, err := b.server.BlockHeaders.LatestBlockLocator()
if err != nil {
log.Criticalf("Couldn't retrieve latest block "+
"locator: %s", err)
return
}
stopHash := &zeroHash
sp.PushGetHeadersMsg(locator, stopHash)
}
// Start syncing by choosing the best candidate if needed.
b.startSync(peers)
}
// DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(sp *ServerPeer) {
// Ignore if we are shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
select {
case b.peerChan <- &donePeerMsg{peer: sp}:
case <-b.quit:
return
}
}
// handleDonePeerMsg deals with peers that have signalled they are done. It
// removes the peer as a candidate for syncing and in the case where it was the
// current sync peer, attempts to select a new best peer to sync from. It is
// invoked from the syncHandler goroutine.
func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) {
// Remove the peer from the list of candidate peers.
for e := peers.Front(); e != nil; e = e.Next() {
if e.Value == sp {
peers.Remove(e)
break
}
}
log.Infof("Lost peer %s", sp)
// Attempt to find a new peer to sync from if the quitting peer is the
// sync peer. Also, reset the header state.
if b.SyncPeer() != nil && b.SyncPeer() == sp {
b.syncPeerMutex.Lock()
b.syncPeer = nil
b.syncPeerMutex.Unlock()
header, height, err := b.server.BlockHeaders.ChainTip()
if err != nil {
return
}
b.headerList.ResetHeaderState(headerlist.Node{
Header: *header,
Height: int32(height),
})
b.startSync(peers)
}
}
// cfHandler is the cfheader download handler for the block manager. It must be
// run as a goroutine. It requests and processes cfheaders messages in a
// separate goroutine from the peer handlers.
func (b *blockManager) cfHandler() {
// If a loop ends with a quit, we want to signal that the goroutine is
// done.
defer func() {
log.Trace("Committed filter header handler done")
b.wg.Done()
}()
log.Infof("Waiting for block headers to sync, then will start " +
"cfheaders sync...")
// We'll wait until the main header sync is mostly finished before we
// actually start to sync the set of cfheaders. We do this to speed up
// the sync, as the check pointed sync is faster, then fetching each
// header from each peer during the normal "at tip" syncing.
b.newHeadersSignal.L.Lock()
for !b.IsCurrent() {
b.newHeadersSignal.Wait()
// While we're awake, we'll quickly check to see if we need to
// quit early.
select {
case <-b.quit:
b.newHeadersSignal.L.Unlock()
return
default:
}
}
b.newHeadersSignal.L.Unlock()
// Now that we know the header sync is mostly finished, we'll grab the
// current chain tip so we can base our header sync off of that.
lastHeader, lastHeight, err := b.server.BlockHeaders.ChainTip()
if err != nil {
log.Critical(err)
return
}
lastHash := lastHeader.BlockHash()
log.Infof("Starting cfheaders sync at block_height=%v, hash=%v", lastHeight,
lastHeader.BlockHash())
// We'll sync the headers and checkpoints for all filter types in
// parallel, by using a goroutine for each filter type.
var wg sync.WaitGroup
wg.Add(len(filterTypes))
for fType, storeLookup := range filterTypes {
// Launch a goroutine to get all of the filter headers for this
// filter type.
go func(fType wire.FilterType, storeLookup func(
s *ChainService) *headerfs.FilterHeaderStore) {
defer wg.Done()
log.Infof("Starting cfheaders sync for "+
"filter_type=%v", fType)
var (
goodCheckpoints []*chainhash.Hash
err error
)
// Get the header store for this filter type.
store := storeLookup(b.server)
// We're current as we received on startCFHeaderSync.
// If we have less than a full checkpoint's worth of
// blocks, such as on simnet, we don't really need to
// request checkpoints as we'll get 0 from all peers.
// We can go on and just request the cfheaders.
for len(goodCheckpoints) == 0 &&
lastHeight >= wire.CFCheckptInterval {
// Quit if requested.
select {
case <-b.quit:
return
default:
}
// Try to get all checkpoints from current
// peers.
allCheckpoints := b.getCheckpts(&lastHash, fType)
if len(allCheckpoints) == 0 {
log.Warnf("Unable to fetch set of " +
"candidate checkpoints, trying again...")
time.Sleep(QueryTimeout)
continue
}
// See if we can detect which checkpoint list
// is correct. If not, we will cycle again.
goodCheckpoints, err = b.resolveConflict(
allCheckpoints, store, fType,
)
if err != nil {
log.Debugf("got error attempting "+
"to determine correct cfheader"+
" checkpoints: %v, trying "+
"again", err)
}
if len(goodCheckpoints) == 0 {
time.Sleep(QueryTimeout)
}
}
// Get all the headers up to the last known good
// checkpoint.
b.getCheckpointedCFHeaders(
goodCheckpoints, store, fType,
)
log.Infof("Fully caught up with cfheaders at height "+
"%v, waiting at tip for new blocks", lastHeight)
// Now that we've been fully caught up to the tip of
// the current header chain, we'll wait here for a
// signal that more blocks have been connected. If this
// happens then we'll do another round to fetch the new
// set of filter new set of filter headers
for {
// We'll wait until the filter header tip and
// the header tip are mismatched.
//
// NOTE: We can grab the filterHeaderTipHash
// here without a lock, as this is the only
// goroutine that can modify this value.
b.newHeadersSignal.L.Lock()
for b.filterHeaderTipHash == b.headerTipHash {
// We'll wait here until we're woken up
// by the broadcast signal.
b.newHeadersSignal.Wait()
// Before we proceed, we'll check if we
// need to exit at all
select {
case <-b.quit:
b.newHeadersSignal.L.Unlock()
return
default:
}
}
b.newHeadersSignal.L.Unlock()
// At this point, we know that there're a set
// of new filter headers to fetch, so we'll
// grab them now.
if err = b.getUncheckpointedCFHeaders(
store, fType,
); err != nil {
log.Debugf("couldn't get "+
"uncheckpointed headers for "+
"%v: %v", fType, err)
}
// Quit if requested.
select {
case <-b.quit:
return
default:
}
}
}(fType, storeLookup)
}
wg.Wait()
}
// getUncheckpointedCFHeaders gets the next batch of cfheaders from the
// network, if it can, and resolves any conflicts between them. It then writes
// any verified headers to the store.
func (b *blockManager) getUncheckpointedCFHeaders(
store *headerfs.FilterHeaderStore, fType wire.FilterType) error {
// Get the filter header store's chain tip.
_, filtHeight, err := store.ChainTip()
if err != nil {
return fmt.Errorf("error getting filter chain tip: %v", err)
}
blockHeader, blockHeight, err := b.server.BlockHeaders.ChainTip()
if err != nil {
return fmt.Errorf("error getting block chain tip: %v", err)
}
// If the block height is somehow before the filter height, then this
// means that we may still be handling a re-org, so we'll bail our so
// we can retry after a timeout.
if blockHeight < filtHeight {
return fmt.Errorf("reorg in progress, waiting to get "+
"uncheckpointed cfheaders (block height %d, filter "+
"height %d", blockHeight, filtHeight)
}
// If the heights match, then we're fully synced, so we don't need to
// do anything from there.
if blockHeight == filtHeight {
log.Tracef("cfheaders already caught up to blocks")
return nil
}
log.Infof("Attempting to fetch set of un-checkpointed filters "+
"at height=%v, hash=%v", blockHeight, blockHeader.BlockHash())
// Query all peers for the responses.
startHeight := filtHeight + 1
headers := b.getCFHeadersForAllPeers(startHeight, fType)
if len(headers) == 0 {
return fmt.Errorf("couldn't get cfheaders from peers")
}
// For each header, go through and check whether all headers messages
// have the same filter hash. If we find a difference, get the block,
// calculate the filter, and throw out any mismatching peers.
for i := 0; i < wire.MaxCFHeadersPerMsg; i++ {
if checkForCFHeaderMismatch(headers, i) {
targetHeight := startHeight + uint32(i)
log.Warnf("Detected cfheader mismatch at "+
"height=%v!!!", targetHeight)
// Get the block header for this height, along with the
// block as well.
header, err := b.server.BlockHeaders.FetchHeaderByHeight(
targetHeight,
)
if err != nil {
return err
}
block, err := b.server.GetBlock(header.BlockHash())
if err != nil {
return err
}
log.Warnf("Attempting to reconcile cfheader mismatch "+
"amongst %v peers", len(headers))
// We'll also fetch each of the filters from the peers
// that reported check points, as we may need this in
// order to determine which peers are faulty.
filtersFromPeers := b.fetchFilterFromAllPeers(
targetHeight, header.BlockHash(), fType,
)
badPeers, err := resolveCFHeaderMismatch(
block.MsgBlock(), fType, filtersFromPeers,
)
if err != nil {
return err
}
log.Warnf("Banning %v peers due to invalid filter "+
"headers", len(badPeers))
for _, peer := range badPeers {
log.Infof("Banning peer=%v for invalid filter "+
"headers", peer)
sp := b.server.PeerByAddr(peer)
if sp != nil {
b.server.BanPeer(sp)
sp.Disconnect()
}
delete(headers, peer)
}
}
}
// Get the longest filter hash chain and write it to the store.
key, maxLen := "", 0
for peer, msg := range headers {
if len(msg.FilterHashes) > maxLen {
key, maxLen = peer, len(msg.FilterHashes)
}
}
// We'll now fetch the set of pristine headers from the map. If ALL the
// peers were banned, then we won't have a set of headers at all. We'll
// return nil so we can go to the top of the loop and fetch from a new
// set of peers.
pristineHeaders, ok := headers[key]
if !ok {
return fmt.Errorf("All peers served bogus headers! Retrying " +
"with new set")
}
_, err = b.writeCFHeadersMsg(pristineHeaders, store)
return err
}
// getCheckpointedCFHeaders catches a filter header store up with the
// checkpoints we got from the network. It assumes that the filter header store
// matches the checkpoints up to the tip of the store.
func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
store *headerfs.FilterHeaderStore, fType wire.FilterType) {
// We keep going until we've caught up the filter header store with the
// latest known checkpoint.
curHeader, curHeight, err := store.ChainTip()
if err != nil {
panic("getting chaintip from store")
}
initialFilterHeader := curHeader
log.Infof("Fetching set of checkpointed cfheaders filters from "+
"height=%v, hash=%v", curHeight, curHeader)
// The starting interval is the checkpoint index that we'll be starting
// from based on our current height in the filter header index.
startingInterval := curHeight / wire.CFCheckptInterval
log.Infof("Starting to query for cfheaders from "+
"checkpoint_interval=%v", startingInterval)
queryMsgs := make([]wire.Message, 0, len(checkpoints))
// We'll also create an additional set of maps that we'll use to
// re-order the responses as we get them in.
queryResponses := make(map[uint32]*wire.MsgCFHeaders)
stopHashes := make(map[chainhash.Hash]uint32)
// Generate all of the requests we'll be batching and space to store
// the responses. Also make a map of stophash to index to make it
// easier to match against incoming responses.
//
// TODO(roasbeef): extract to func to test
currentInterval := startingInterval
for currentInterval < uint32(len(checkpoints)) {
// Each checkpoint is spaced wire.CFCheckptInterval after the
// prior one, so we'll fetch headers in batches using the
// checkpoints as a guide.
startHeightRange := uint32(
currentInterval*wire.CFCheckptInterval,
) + 1
endHeightRange := uint32(
(currentInterval + 1) * wire.CFCheckptInterval,
)
log.Tracef("Checkpointed cfheaders request start_range=%v, "+
"end_range=%v", startHeightRange, endHeightRange)
// In order to fetch the range, we'll need the block header for
// the end of the height range.
stopHeader, err := b.server.BlockHeaders.FetchHeaderByHeight(
endHeightRange,
)
if err != nil {
// Try to recover this.
select {
case <-b.quit:
return
default:
currentInterval--
time.Sleep(QueryTimeout)
continue
}
}
stopHash := stopHeader.BlockHash()
// Once we have the stop hash, we can construct the query
// message itself.
queryMsg := wire.NewMsgGetCFHeaders(
fType, uint32(startHeightRange), &stopHash,
)
// We'll mark that the ith interval is queried by this message,
// and also map the top hash back to the index of this message.
queryMsgs = append(queryMsgs, queryMsg)
stopHashes[stopHash] = currentInterval
// With the queries for this interval constructed, we'll move
// onto the next one.
currentInterval++
}
log.Infof("Attempting to query for %v cfheader batches", len(queryMsgs))
// With the set of messages constructed, we'll now request the batch
// all at once. This message will distributed the header requests
// amongst all active peers, effectively sharding each query
// dynamically.
b.server.queryBatch(
queryMsgs,
// Callback to process potential replies. Always called from
// the same goroutine as the outer function, so we don't have
// to worry about synchronization.
func(sp *ServerPeer, query wire.Message,
resp wire.Message) bool {
r, ok := resp.(*wire.MsgCFHeaders)
if !ok {
// We are only looking for cfheaders messages.
return false
}
q, ok := query.(*wire.MsgGetCFHeaders)
if !ok {
// We sent a getcfheaders message, so that's
// what we should be comparing against.
return false
}
// The response doesn't match the query.
if q.FilterType != r.FilterType ||
q.StopHash != r.StopHash {
return false
}
checkPointIndex, ok := stopHashes[r.StopHash]
if !ok {
// We never requested a matching stop hash.
return false
}
// The response doesn't match the checkpoint.
if !verifyCheckpoint(checkpoints[checkPointIndex], r) {
log.Warnf("Checkpoints at index %v don't match " +
"response!!!")
return false
}
// At this point, the response matches the query, and
// the relevant checkpoint we got earlier, so we should
// always return true so that the peer looking for the
// answer to this query can move on to the next query.
// We still have to check that these headers are next
// before we write them; otherwise, we cache them if
// they're too far ahead, or discard them if we don't
// need them.
// Find the first and last height for the blocks
// represented by this message.
startHeight := checkPointIndex*wire.CFCheckptInterval + 1
lastHeight := startHeight + wire.CFCheckptInterval
log.Debugf("Got cfheaders from height=%v to height=%v",
startHeight, lastHeight)
// If this is out of order but not yet written, we can
// verify that the checkpoints match, and then store
// them.
if startHeight > curHeight+1 {
log.Debugf("Got response for headers at "+
"height=%v, only at height=%v, stashing",
startHeight, curHeight)
queryResponses[checkPointIndex] = r
return true
}
// If this is out of order stuff that's already been
// written, we can ignore it.
if lastHeight <= curHeight {
log.Debugf("Received out of order reply "+
"end_height=%v, already written", lastHeight)
return true
}
// If this is the very first range we've requested, we
// may already have a portion of the headers written to
// disk.
//
// TODO(roasbeef): can eventually special case handle
// this at the top
if bytes.Equal(curHeader[:], initialFilterHeader[:]) {
// So we'll set the prev header to our best
// known header, and seek within the header
// range a bit so we don't write any duplicate
// headers.
r.PrevFilterHeader = *curHeader
offset := curHeight + 1 - startHeight
r.FilterHashes = r.FilterHashes[offset:]
}
curHeader, err = b.writeCFHeadersMsg(r, store)
if err != nil {
panic(fmt.Sprintf("couldn't write cfheaders "+
"msg: %v", err))
}
// Then, we cycle through any cached messages, adding
// them to the batch and deleting them from the cache.
for {
checkPointIndex++
// We'll also update the current height of the
// last written set of cfheaders.
curHeight = checkPointIndex * wire.CFCheckptInterval
// Break if we've gotten to the end of the
// responses or we don't have the next one.
if checkPointIndex >= uint32(len(queryResponses)) {
break
}
// If we don't yet have the next response, then
// we'll break out so we can wait for the peers
// to respond with this message.
r := queryResponses[checkPointIndex]
if r == nil {
break
}
// We have another response to write, so delete
// it from the cache and write it.
queryResponses[checkPointIndex] = nil
log.Debugf("Writing cfheaders at height=%v to "+
"next checkpoint", curHeight)
// As we write the set of headers to disk, we
// also obtain the hash of the last filter
// header we've written to disk so we can
// properly set the PrevFilterHeader field of
// the next message.
curHeader, err = b.writeCFHeadersMsg(r, store)
if err != nil {
panic(fmt.Sprintf("couldn't write "+
"cfheaders msg: %v", err))
}
}
return true
},
// Same quit channel we're watching.
b.quit,
)
}
// writeCFHeadersMsg writes a cfheaders message to the specified store. It
// assumes that everything is being written in order. The hints are required to
// store the correct block heights for the filters. We also return final
// constructed cfheader in this range as this lets callers populate the prev
// filter header field in the next message range before writing to disk.
func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders,
store *headerfs.FilterHeaderStore) (*chainhash.Hash, error) {
b.newFilterHeadersMtx.Lock()
defer b.newFilterHeadersMtx.Unlock()
// Check that the PrevFilterHeader is the same as the last stored so we
// can prevent misalignment.
tip, _, err := store.ChainTip()
if err != nil {
return nil, err
}
if *tip != msg.PrevFilterHeader {
return nil, fmt.Errorf("attempt to write cfheaders out of order")
}
// Cycle through the headers and compute each header based on the prev
// header and the filter hash from the cfheaders response entries.
lastHeader := msg.PrevFilterHeader
headerBatch := make([]headerfs.FilterHeader, 0, wire.CFCheckptInterval)
for _, hash := range msg.FilterHashes {
// header = dsha256(filterHash || prevHeader)
lastHeader = chainhash.DoubleHashH(
append(hash[:], lastHeader[:]...),
)
headerBatch = append(headerBatch, headerfs.FilterHeader{
FilterHash: lastHeader,
})
}
numHeaders := len(headerBatch)
// We'll now query for the set of block headers which match each of
// these filters headers in their corresponding chains. Our query will
// return the headers for the entire checkpoint interval ending at the
// designated stop hash.
blockHeaders := b.server.BlockHeaders
matchingBlockHeaders, startHeight, err := blockHeaders.FetchHeaderAncestors(
uint32(numHeaders-1), &msg.StopHash,
)
if err != nil {
return nil, err
}
// The final height in our range will be offset to the end of this
// particular checkpoint interval.
lastHeight := startHeight + uint32(numHeaders) - 1
lastBlockHeader := matchingBlockHeaders[numHeaders-1]
// We only need to set the height and hash of the very last filter
// header in the range to ensure that the index properly updates the
// tip of the chain.
headerBatch[numHeaders-1].HeaderHash = lastBlockHeader.BlockHash()
headerBatch[numHeaders-1].Height = lastHeight
log.Debugf("Writing filter headers up to height=%v", lastHeight)
// Write the header batch.
err = store.WriteHeaders(headerBatch...)
if err != nil {
return nil, err
}
// Notify subscribers, and also update the filter header progress
// logger at the same time.
msgType := connectBasic
for i, header := range matchingBlockHeaders {
header := header
headerHeight := startHeight + uint32(i)