Skip to content

Commit

Permalink
- Change publishing message format (#92)
Browse files Browse the repository at this point in the history
- Change publishing message format
- add message.proto
  • Loading branch information
Neokun authored Dec 12, 2023
1 parent ff3b8a6 commit 9cb0e2c
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 40 deletions.
51 changes: 25 additions & 26 deletions internal/publisher/data_center_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package publisher

import (
"context"
"strconv"

"github.com/KyberNetwork/evmlistener/pkg/common"
"github.com/KyberNetwork/evmlistener/pkg/pubsub"
Expand Down Expand Up @@ -38,40 +37,40 @@ func (p *DataCenterPublisher) Publish(ctx context.Context, msg types.Message) er
p.logger.Warnf("%d of blocks is re-orged", len(msg.RevertedBlocks))
}

for _, b := range msg.NewBlocks {
data, err := p.packMsgData(b)
if err != nil {
p.logger.Errorf("error on packing message to publish: %v", err)
data, err := p.packMsgData(msg)
if err != nil {
p.logger.Errorf("error on packing message to publish: %v", err)

return err
}
extra := p.extractExtraData(b)
return err
}

msgID, err := p.client.Publish(ctx, p.config.OrderingKey, data, extra)
if err != nil {
p.logger.Errorf("error publish block %d to pubsub: %v", b.Header.Number.Uint64(), err)
// extra := p.extractExtraData(b)

return err
}
msgID, err := p.client.Publish(ctx, p.config.OrderingKey, data, nil)
if err != nil {
p.logger.Errorf("error publish to pubsub: %v", err)

p.logger.Debugf("Done publish block %d with message id %s", b.Header.Number.Uint64(), msgID)
return err
}

return nil
}
p.logger.Debugf("Done publish %d new blocks, %d reverted blocks with message id %s",
len(msg.NewBlocks), len(msg.RevertedBlocks), msgID)

func (p *DataCenterPublisher) extractExtraData(block types.Block) map[string]string {
return map[string]string{
"block_number": block.Header.Number.String(),
"block_hash": block.Hash,
"parent_hash": block.Header.ParentHash,
"block_timestamp": strconv.Itoa(int(block.Header.Timestamp)),
}
return nil
}

func (p *DataCenterPublisher) packMsgData(b types.Block) ([]byte, error) {
block := b.ToProtobuf()
bytesData, err := proto.Marshal(block)
// func (p *DataCenterPublisher) extractExtraData(block types.Block) map[string]string {
// return map[string]string{
// "block_number": block.Header.Number.String(),
// "block_hash": block.Hash,
// "parent_hash": block.Header.ParentHash,
// "block_timestamp": strconv.Itoa(int(block.Header.Timestamp)),
// }
//}

func (p *DataCenterPublisher) packMsgData(msg types.Message) ([]byte, error) {
msgData := msg.ToProtobuf()
bytesData, err := proto.Marshal(msgData)
if err != nil {
p.logger.Errorf("marshal data to bytes err: %v", err)

Expand Down
28 changes: 14 additions & 14 deletions internal/publisher/data_center_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ func TestDataCenterPublisher_Publish(t *testing.T) {
})
assert.NoError(t, err)

sender := make(chan *pb.Block)
sender := make(chan *pb.Message)
childCtx, done := context.WithCancel(ctx)
go getBlockFromSub(t, childCtx, sub, sender)
go getMsgFromSub(t, childCtx, sub, sender)

var blocks []*pb.Block
counter := 2
for {
select {
case b := <-sender:
blocks = append(blocks, b)
case msg := <-sender:
blocks = append(blocks, msg.NewBlocks...)
case <-time.After(1 * time.Second):
counter -= 1
if counter == 0 {
Expand All @@ -141,26 +141,26 @@ func TestDataCenterPublisher_Publish(t *testing.T) {
}
}

func getBlockFromSub(t *testing.T, ctx context.Context, sub *pubsub.Subscription, sender chan<- *pb.Block) {
func getMsgFromSub(t *testing.T, ctx context.Context, sub *pubsub.Subscription, sender chan<- *pb.Message) {
t.Helper()

_ = sub.Receive(ctx, func(ctx context.Context, message *pubsub.Message) {
assert.Len(t, message.Attributes, 4, "must contain extra info")
_, ok := message.Attributes["block_number"]
assert.True(t, ok)
_, ok = message.Attributes["block_hash"]
assert.True(t, ok)
// assert.Len(t, message.Attributes, 4, "must contain extra info")
// _, ok := message.Attributes["block_number"]
// assert.True(t, ok)
// _, ok = message.Attributes["block_hash"]
// assert.True(t, ok)

data, err := common.DecompressWithSizePrepended(message.Data)
assert.NoError(t, err)

var block pb.Block
err = proto.Unmarshal(data, &block)
var msg pb.Message
err = proto.Unmarshal(data, &msg)
assert.NoError(t, err)

sender <- &block
sender <- &msg
message.Ack()
t.Logf("got block %d", block.Number)
t.Logf("got blocks")
})
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,23 @@ type Message struct {
NewBlocks []Block `json:"newBlocks"`
}

func (m Message) ToProtobuf() *pb.Message {
revertedBlocks := make([]*pb.Block, len(m.RevertedBlocks))
for i, b := range m.RevertedBlocks {
revertedBlocks[i] = b.ToProtobuf()
}

newBlocks := make([]*pb.Block, len(m.NewBlocks))
for i, b := range m.NewBlocks {
newBlocks[i] = b.ToProtobuf()
}

return &pb.Message{
RevertedBlocks: revertedBlocks,
NewBlocks: newBlocks,
}
}

func (b *Block) ToProtobuf() *pb.Block {
if b == nil {
return nil
Expand Down
14 changes: 14 additions & 0 deletions protobuf/message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package ethereum;

option go_package = "./pb";

import "ethereum.proto";

message Message {
// List of old blocks that are re-org.
repeated ethereum.Block reverted_blocks = 1;
// List of new blocks. Normally have size 1 if no re-org happened.
repeated ethereum.Block new_blocks = 2;
}
167 changes: 167 additions & 0 deletions protobuf/pb/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9cb0e2c

Please sign in to comment.