Skip to content

Commit

Permalink
Handle new head in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
hiepnv90 committed Nov 14, 2023
1 parent 92cf76e commit c7e99c8
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 72 deletions.
62 changes: 30 additions & 32 deletions pkg/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
const (
bufLen = 10000

maxQueueLen = 256

metricNameLastReceivedBlockNumber = "evmlistener_last_received_block_number"
metricNameLastCheckedBlockNumber = "evmlistener_last_checked_block_number"
metricNameLastHandledBlockNumber = "evmlistener_last_handled_block_number"
Expand Down Expand Up @@ -62,35 +64,34 @@ func New(

sanityEVMClient: sanityEVMClient,
sanityCheckInterval: sanityCheckInterval,

queue: NewQueue(maxQueueLen),
maxQueueLen: maxQueueLen,
}
}

func (l *Listener) publishBlock(ch chan<- types.Block, block *types.Block) {
func (l *Listener) publishBlock(ch chan<- types.Block, seq uint64, block *types.Block) {
if l.queue == nil {
ch <- *block

return
}

baseBlockNumber := l.queue.BlockNumber()
blockNumber := block.Number.Uint64()

if blockNumber < baseBlockNumber {
ch <- *block

expectedSeq := l.queue.SequenceNumber()
if seq < expectedSeq {
return
}

if int(blockNumber-baseBlockNumber) >= l.maxQueueLen {
for i := 0; i <= int(blockNumber-baseBlockNumber)-l.maxQueueLen; i++ {
if int(seq-expectedSeq) >= l.maxQueueLen {
for i := 0; i <= int(seq-expectedSeq)-l.maxQueueLen; i++ {
b, _ := l.queue.Dequeue()
if b != nil {
ch <- *b
}
}
}

l.queue.Insert(block)
l.queue.Insert(seq, block)
for !l.queue.Empty() {
b, _ := l.queue.Peek()
if b == nil {
Expand Down Expand Up @@ -192,7 +193,7 @@ func (l *Listener) handleOldHeaders(ctx context.Context, blockCh chan<- types.Bl
}

for i := range blocks {
l.publishBlock(blockCh, &blocks[i])
blockCh <- blocks[i]
}
}

Expand Down Expand Up @@ -220,6 +221,7 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ
return err
}

seq := uint64(1)
for {
select {
case <-ctx.Done():
Expand All @@ -233,20 +235,27 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ
case header := <-headerCh:
l.l.Debugw("Receive new head of the chain", "header", header)

b, err := l.handleNewHeader(ctx, header)
if err != nil {
l.l.Errorw("Fail to handle new head", "header", header, "error", err)

return err
}

l.mu.Lock()
if l.lastReceivedBlock == nil || l.lastReceivedBlock.Timestamp < b.Timestamp {
l.lastReceivedBlock = &b
if l.lastReceivedBlock == nil || l.lastReceivedBlock.Timestamp < header.Time {
l.lastReceivedBlock = &types.Block{
Number: header.Number,
Hash: header.Hash,
Timestamp: header.Time,
ParentHash: header.ParentHash,
}
}
l.mu.Unlock()

l.publishBlock(blockCh, &b)
go func(seq uint64, head *types.Header) {
b, err := l.handleNewHeader(ctx, head)
if err != nil {
l.l.Fatalw("Fail to handle new head", "header", header, "error", err)
}

l.publishBlock(blockCh, seq, &b)
}(seq, header)

seq++
}
}
}
Expand Down Expand Up @@ -284,17 +293,6 @@ func (l *Listener) Run(ctx context.Context) error {
return err
}

if l.queue != nil {
head, err := l.handler.blockKeeper.Head()
if err != nil {
l.l.Errorw("Fail to get block head", "error", err)

return err
}

l.queue.SetBlockNumber(head.Number.Uint64() + 1)
}

l.setResuming(true)

// Start go routine for sanity checking.
Expand Down
47 changes: 20 additions & 27 deletions pkg/listener/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type Queue struct {
maxSize int
size int

blockNumber uint64
mu sync.Mutex
seq uint64
mu sync.Mutex
}

// NewQueue instantiates a new empty queue with the specified size of maximum number of elements that it can hold.
Expand All @@ -26,7 +26,7 @@ func NewQueue(maxSize int) *Queue {
panic("Invalid maxSize, should be at least 1")
}

queue := &Queue{maxSize: maxSize}
queue := &Queue{maxSize: maxSize, seq: 1}
queue.clear()

return queue
Expand All @@ -37,38 +37,31 @@ func (q *Queue) insertAt(value *types.Block, idx int) {
q.size++
}

func (q *Queue) insert(value *types.Block) {
blockNumber := value.Number.Uint64()
if q.blockNumber == 0 {
q.blockNumber = blockNumber
q.insertAt(value, 0)

return
}

if blockNumber < q.blockNumber {
func (q *Queue) insert(seq uint64, value *types.Block) {
if seq < q.seq {
return
}

if q.isFull() {
q.dequeue()
}

if int(blockNumber-q.blockNumber) >= q.maxSize {
for i := 0; i <= int(blockNumber-q.blockNumber)-q.maxSize; i++ {
// Ignore missing values at the start of queue.
if int(seq-q.seq) >= q.maxSize {
for i := 0; i <= int(seq-q.seq)-q.maxSize; i++ {
q.dequeue()
}
}

q.insertAt(value, int(blockNumber-q.blockNumber))
q.insertAt(value, int(seq-q.seq))
}

// Insert inserts new block into queue relative to current block number.
func (q *Queue) Insert(value *types.Block) {
// Insert inserts new block into queue relative to current seq.
func (q *Queue) Insert(seq uint64, value *types.Block) {
q.mu.Lock()
defer q.mu.Unlock()

q.insert(value)
q.insert(seq, value)
}

func (q *Queue) dequeue() (*types.Block, bool) {
Expand All @@ -88,7 +81,7 @@ func (q *Queue) dequeue() (*types.Block, bool) {
if q.start >= q.maxSize {
q.start = 0
}
q.blockNumber++
q.seq++

return value, ok
}
Expand Down Expand Up @@ -196,26 +189,26 @@ func (q *Queue) String() string {
qValues := q.Values()
values := make([]string, 0, len(qValues))
for _, value := range qValues {
values = append(values, fmt.Sprintf("%v", value))
values = append(values, fmt.Sprintf("%+v", value))
}

str += strings.Join(values, ", ")

return str
}

// BlockNumber returns base block number of queue.
func (q *Queue) BlockNumber() uint64 {
// SequenceNumber returns current sequence number of queue.
func (q *Queue) SequenceNumber() uint64 {
q.mu.Lock()
defer q.mu.Unlock()

return q.blockNumber
return q.seq
}

// SetBlockNumber sets base block number of queue.
func (q *Queue) SetBlockNumber(number uint64) {
// SetSequenceNumber sets current sequence number of queue.
func (q *Queue) SetSequenceNumber(seq uint64) {
q.mu.Lock()
defer q.mu.Unlock()

q.blockNumber = number
q.seq = seq
}
37 changes: 24 additions & 13 deletions pkg/listener/queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package listener

import (
"fmt"
"math/big"
"testing"

Expand Down Expand Up @@ -40,59 +41,69 @@ func (ts *QueueTestSuite) TestInsertDequeue() {
}

for _, block := range blocks {
ts.queue.Insert(block)
ts.queue.Insert(block.Number.Uint64()-8, block)
fmt.Println(ts.queue.String())
}

ts.Require().Equal(uint64(10), ts.queue.BlockNumber())
ts.Require().Equal(3, ts.queue.Size())
ts.Require().Equal(uint64(1), ts.queue.SequenceNumber())
ts.Require().Equal(4, ts.queue.Size())

values := ts.queue.Values()
ts.Require().Equal(3, len(values))
ts.Require().Equal(4, len(values))

ts.Assert().Equal(blocks[0], values[0])
ts.Assert().Equal(blocks[1], values[1])
ts.Assert().Equal(blocks[3], values[2])
ts.Assert().Equal(blocks[2], values[0])
ts.Assert().Equal(blocks[0], values[1])
ts.Assert().Equal(blocks[1], values[2])
ts.Assert().Equal(blocks[3], values[3])

value, ok := ts.queue.Dequeue()
ts.Assert().True(ok)
if ts.Assert().NotNil(value) {
ts.Assert().Equal(blocks[2], value)
}
ts.Assert().Equal(uint64(2), ts.queue.SequenceNumber())
ts.Assert().Equal(3, ts.queue.Size())

value, ok = ts.queue.Dequeue()
ts.Assert().True(ok)
if ts.Assert().NotNil(value) {
ts.Assert().Equal(blocks[0], value)
}
ts.Assert().Equal(uint64(11), ts.queue.BlockNumber())
ts.Assert().Equal(uint64(3), ts.queue.SequenceNumber())
ts.Assert().Equal(2, ts.queue.Size())

value, ok = ts.queue.Dequeue()
ts.Assert().True(ok)
if ts.Assert().NotNil(value) {
ts.Assert().Equal(blocks[1], value)
}
ts.Assert().Equal(uint64(12), ts.queue.BlockNumber())
ts.Assert().Equal(uint64(4), ts.queue.SequenceNumber())
ts.Assert().Equal(1, ts.queue.Size())

value, ok = ts.queue.Dequeue()
ts.Assert().False(ok)
ts.Assert().Nil(value)
ts.Assert().Equal(uint64(13), ts.queue.BlockNumber())
ts.Assert().Equal(uint64(5), ts.queue.SequenceNumber())
ts.Assert().Equal(1, ts.queue.Size())

value, ok = ts.queue.Dequeue()
ts.Assert().False(ok)
ts.Assert().Nil(value)
ts.Assert().Equal(uint64(14), ts.queue.BlockNumber())
ts.Assert().Equal(uint64(6), ts.queue.SequenceNumber())
ts.Assert().Equal(1, ts.queue.Size())

value, ok = ts.queue.Dequeue()
ts.Assert().False(ok)
ts.Assert().Nil(value)
ts.Assert().Equal(uint64(15), ts.queue.BlockNumber())
ts.Assert().Equal(uint64(7), ts.queue.SequenceNumber())
ts.Assert().Equal(1, ts.queue.Size())

value, ok = ts.queue.Dequeue()
ts.Assert().True(ok)
if ts.Assert().NotNil(value) {
ts.Assert().Equal(blocks[3], value)
}
ts.Assert().Equal(uint64(16), ts.queue.BlockNumber())
ts.Assert().Equal(uint64(8), ts.queue.SequenceNumber())
ts.Assert().Equal(0, ts.queue.Size())
}

Expand Down

0 comments on commit c7e99c8

Please sign in to comment.