Skip to content

Commit

Permalink
Merge main into release-v0.4 (#97)
Browse files Browse the repository at this point in the history
* allow option filter logs

* Check for websocket can receive block normally
  • Loading branch information
hiepnv90 authored Jan 9, 2024
1 parent e6dd809 commit 2c74692
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 61 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v3
with:
go-version: "1.21.x"
go-version: "1.21.4"
- name: Checkout
uses: actions/checkout@v3
- name: golangci-lint
Expand All @@ -103,7 +103,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v3
with:
go-version: "1.21.x"
go-version: "1.21.4"
- name: Run test
run: go test -race -v ./...

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/KyberNetwork/evmlistener

go 1.21.0

toolchain go1.21.1
toolchain go1.21.4

require (
github.com/KyberNetwork/kyber-trace-go v0.1.1
Expand Down
6 changes: 4 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func NewListener(c *cli.Context) (*listener.Listener, error) {
redisStream := redis.NewStream(redisClient, maxLen)

topic := c.String(publisherTopicFlag.Name)
handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream)
handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream,
listener.WithEventLogs(nil, nil))

return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval), nil
return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval,
listener.WithEventLogs(nil, nil)), nil
}

const (
Expand Down
14 changes: 9 additions & 5 deletions pkg/evmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,15 @@ func toEthereumFilterQuery(q FilterQuery) ethereum.FilterQuery {
addresses = append(addresses, ethcommon.HexToAddress(address))
}

topics := make([][]ethcommon.Hash, 0, len(q.Topics))
for i, ts := range q.Topics {
topics[i] = make([]ethcommon.Hash, 0, len(ts))
for _, t := range ts {
topics[i] = append(topics[i], ethcommon.HexToHash(t))
var topics [][]ethcommon.Hash
if len(q.Topics) > 0 {
topics = make([][]ethcommon.Hash, 0, len(q.Topics))
for _, ts := range q.Topics {
tps := make([]ethcommon.Hash, 0, len(ts))
for _, t := range ts {
tps = append(tps, ethcommon.HexToHash(t))
}
topics = append(topics, tps)
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/listener/evm_client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (c *EVMClientMock) Next() {
defer c.mu.Unlock()

c.head++

currentHead := c.head
// Publish new head for subscriptions.
for _, ch := range c.subHeadChs {
go func(ch chan<- *types.Header) {
header := c.headerMap[c.sequence[c.head]]
header := c.headerMap[c.sequence[currentHead]]
ch <- &types.Header{
Hash: common.ToHex(header.Hash()),
ParentHash: common.ToHex(header.ParentHash),
Expand Down Expand Up @@ -108,10 +108,10 @@ func (c *EVMClientMock) SubscribeNewHead(ctx context.Context, ch chan<- *types.H
defer c.mu.Unlock()

c.subHeadChs = append(c.subHeadChs, ch)

currentHead := c.sequence[c.head]
// Publish current head to the channel.
go func() {
header := c.headerMap[c.sequence[c.head]]
header := c.headerMap[currentHead]
ch <- &types.Header{
Hash: common.ToHex(header.Hash()),
ParentHash: common.ToHex(header.ParentHash),
Expand Down
17 changes: 17 additions & 0 deletions pkg/listener/filter_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package listener

type Option func(opt *FilterOption)

type FilterOption struct {
filterContracts []string
filterTopics [][]string
withLogs bool
}

func WithEventLogs(contracts []string, topics [][]string) Option {
return func(opt *FilterOption) {
opt.withLogs = true
opt.filterContracts = contracts
opt.filterTopics = topics
}
}
15 changes: 12 additions & 3 deletions pkg/listener/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ type Handler struct {
blockKeeper block.Keeper
publisher pubsub.Publisher
l *zap.SugaredLogger
option *FilterOption
}

// NewHandler ...
func NewHandler(
l *zap.SugaredLogger, topic string, evmClient evmclient.IClient,
blockKeeper block.Keeper, publisher pubsub.Publisher,
blockKeeper block.Keeper, publisher pubsub.Publisher, options ...Option,
) *Handler {
var opts FilterOption
for _, v := range options {
v(&opts)
}

return &Handler{
topic: topic,
evmClient: evmClient,
blockKeeper: blockKeeper,
publisher: publisher,
l: l,
option: &opts,
}
}

Expand Down Expand Up @@ -79,7 +86,8 @@ func (h *Handler) Init(ctx context.Context) error {
fromBlock := toBlock - uint64(h.blockKeeper.Cap()) + 1

h.l.Infow("Get blocks from node", "from", fromBlock, "to", toBlock)
blocks, err := getBlocks(ctx, h.evmClient, fromBlock, toBlock)
blocks, err := GetBlocks(ctx, h.evmClient, fromBlock, toBlock, h.option.withLogs,
h.option.filterContracts, h.option.filterTopics)
if err != nil {
h.l.Errorw("Fail to get blocks", "from", fromBlock, "to", toBlock, "error", err)

Expand Down Expand Up @@ -112,7 +120,8 @@ func (h *Handler) getBlock(ctx context.Context, hash string) (types.Block, error
return types.Block{}, err
}

b, err = getBlockByHash(ctx, h.evmClient, hash)
b, err = getBlockByHash(ctx, h.evmClient, hash, h.option.withLogs, h.option.filterContracts,
h.option.filterTopics)
if err != nil {
h.l.Errorw("Fail to get block from ndoe", "hash", hash, "error", err)

Expand Down
8 changes: 4 additions & 4 deletions pkg/listener/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *HandlerTestSuite) TestHandle() {
// Handle for normal block (chain was not re-organized).
ts.evmClient.Next()
hash := "0xc0c29448be86bca9d0db94b79cd1a6bd1361aed1e394d3a2a218fb98b159ab74"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -93,7 +93,7 @@ func (ts *HandlerTestSuite) TestHandle() {
// Handle for far away block (lost connection).
ts.evmClient.SetHead(52)
hash = "0x132c1eb1799a5219b055674177ba95e946feb5f011c7c1409630d42c0581ee52"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -113,7 +113,7 @@ func (ts *HandlerTestSuite) TestHandle() {
ts.Require().NoError(err)

hash = "0xfe5db0e13993eb721f8174edc783e92dcee70e5a2eb3cd87e8b6c7ba5ab24986"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -140,7 +140,7 @@ func (ts *HandlerTestSuite) TestHandle() {

ts.evmClient.Next()
hash = "0x2394b0b03959156ec90096deadd34f68195a8d8f5f1e5438ea237be7675178c2"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand Down
71 changes: 50 additions & 21 deletions pkg/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ import (
)

const (
bufLen = 100

maxQueueLen = 256
bufLen = 100
maxQueueLen = 256
defaultSanityCheckInterval = time.Minute

metricNameLastReceivedBlockNumber = "evmlistener_last_received_block_number"
metricNameLastCheckedBlockNumber = "evmlistener_last_checked_block_number"
metricNameLastHandledBlockNumber = "evmlistener_last_handled_block_number"
)

var errConnectionCorrupted = errors.New("connection is corrupted")

// Listener represents a listener service for on-chain events.
type Listener struct {
l *zap.SugaredLogger
Expand All @@ -47,14 +49,25 @@ type Listener struct {

queue *Queue
maxQueueLen int

option FilterOption
}

// New ...
func New(
l *zap.SugaredLogger, wsEVMClient evmclient.IClient,
httpEVMClient evmclient.IClient, handler *Handler,
sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration,
sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration, opts ...Option,
) *Listener {
var o FilterOption
for _, v := range opts {
v(&o)
}

if sanityCheckInterval == 0 {
sanityCheckInterval = defaultSanityCheckInterval
}

return &Listener{
l: l,

Expand All @@ -67,6 +80,7 @@ func New(

queue: NewQueue(maxQueueLen),
maxQueueLen: maxQueueLen,
option: o,
}
}

Expand Down Expand Up @@ -108,14 +122,15 @@ func (l *Listener) handleNewHeader(ctx context.Context, header *types.Header) (t
var logs []types.Log

l.l.Debugw("Handle for new head", "hash", header.Hash)
opts := l.option
if opts.withLogs {
logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash, opts.filterContracts, opts.filterTopics)
if err != nil {
l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err)

logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash)
if err != nil {
l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err)

return types.Block{}, err
return types.Block{}, err
}
}

l.l.Debugw("Handle new head success", "hash", header.Hash)

return headerToBlock(header, logs), nil
Expand All @@ -129,7 +144,8 @@ func (l *Listener) getBlocks(ctx context.Context, fromBlock, toBlock uint64) ([]
i := i
blkNum := uint64(i) + fromBlock
g.Go(func() error {
block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum))
block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum),
l.option.withLogs, l.option.filterContracts, l.option.filterTopics)
if err != nil {
l.l.Errorw("Fail to get block by number", "number", blkNum, "error", err)

Expand Down Expand Up @@ -202,6 +218,7 @@ func (l *Listener) handleOldHeaders(ctx context.Context, blockCh chan<- types.Bl
return nil
}

//nolint:cyclop
func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- types.Block) error {
l.l.Info("Start subscribing for new head of the chain")
headerCh := make(chan *types.Header, 1)
Expand All @@ -221,6 +238,10 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ
return err
}

ticker := time.NewTicker(l.sanityCheckInterval)
defer ticker.Stop()

lastReceivedTime := time.Now()
seq := uint64(1)
for {
select {
Expand All @@ -232,9 +253,16 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ
l.l.Errorw("Error while subscribing new head", "error", err)

return err
case <-ticker.C:
if time.Since(lastReceivedTime) > l.sanityCheckInterval {
l.l.Errorw("Websocket connection is corrupted", "lastReceivedTime", lastReceivedTime)

return errConnectionCorrupted
}
case header := <-headerCh:
l.l.Debugw("Receive new head of the chain", "header", header)

lastReceivedTime = time.Now()
l.mu.Lock()
if l.lastReceivedBlock == nil || l.lastReceivedBlock.Timestamp < header.Time {
l.lastReceivedBlock = &types.Block{
Expand Down Expand Up @@ -272,7 +300,8 @@ func (l *Listener) syncBlocks(ctx context.Context, blockCh chan types.Block) err
websocket.CloseNormalClosure, websocket.CloseServiceRestart) &&
!errors.Is(err, syscall.ECONNRESET) &&
!errors.Is(err, ethereum.NotFound) &&
err.Error() != errStringUnknownBlock {
err.Error() != errStringUnknownBlock &&
!errors.Is(err, errConnectionCorrupted) {
return err
}

Expand All @@ -286,11 +315,11 @@ func (l *Listener) Run(ctx context.Context) error {
defer l.l.Info("Stop listener service")

l.l.Info("Init handler")
err := l.handler.Init(ctx)
if err != nil {
l.l.Errorw("Fail to init handler", "error", err)
returnErr := l.handler.Init(ctx)
if returnErr != nil {
l.l.Errorw("Fail to init handler", "error", returnErr)

return err
return returnErr
}

l.setResuming(true)
Expand All @@ -306,9 +335,9 @@ func (l *Listener) Run(ctx context.Context) error {
// Synchronize blocks from node.
blockCh := make(chan types.Block, bufLen)
go func() {
err := l.syncBlocks(ctx, blockCh)
if err != nil {
l.l.Fatalw("Fail to sync blocks", "error", err)
returnErr = l.syncBlocks(ctx, blockCh)
if returnErr != nil {
l.l.Errorw("Fail to sync blocks", "error", returnErr)
}

close(blockCh)
Expand All @@ -328,7 +357,7 @@ func (l *Listener) Run(ctx context.Context) error {
for b := range blockCh {
l.l.Debugw("Receive new block",
"hash", b.Hash, "parent", b.ParentHash, "numLogs", len(b.Logs))
err = l.handler.Handle(ctx, b)
err := l.handler.Handle(ctx, b)
if err != nil {
l.l.Errorw("Fail to handle new block", "hash", b.Hash, "error", err)

Expand All @@ -340,7 +369,7 @@ func (l *Listener) Run(ctx context.Context) error {
l.mu.Unlock()
}

return nil
return returnErr
}

func (l *Listener) startMetricsCollector(_ context.Context) error {
Expand Down
Loading

0 comments on commit 2c74692

Please sign in to comment.