Skip to content

Commit

Permalink
Calculate max block proto file size in Mb
Browse files Browse the repository at this point in the history
  • Loading branch information
kompotkot committed Jul 9, 2024
1 parent 5a5e547 commit 4418236
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 51 deletions.
11 changes: 7 additions & 4 deletions blockchain/arbitrum_one/arbitrum_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
block.L1BlockNumber,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
11 changes: 7 additions & 4 deletions blockchain/arbitrum_sepolia/arbitrum_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
block.L1BlockNumber,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
11 changes: 7 additions & 4 deletions blockchain/blockchain.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
{{if .IsSideChain -}}block.L1BlockNumber,{{else}}0,{{end}}
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
11 changes: 7 additions & 4 deletions blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
0,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
block.L1BlockNumber,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
10 changes: 5 additions & 5 deletions blockchain/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ type BlockData struct {

type BlockchainClient interface {
GetLatestBlockNumber() (*big.Int, error)
FetchAsProtoBlocksWithEvents(*big.Int, *big.Int, bool, int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error)
FetchAsProtoBlocksWithEvents(*big.Int, *big.Int, bool, int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error)
ProcessBlocksToBatch([]proto.Message) (proto.Message, error)
DecodeProtoEntireBlockToJson(*bytes.Buffer) (*seer_common.BlocksBatchJson, error)
DecodeProtoEntireBlockToLabels(*bytes.Buffer, map[uint64]uint64, map[string]map[string]map[string]string) ([]indexer.EventLabel, []indexer.TransactionLabel, error)
DecodeProtoTransactionsToLabels([]string, map[uint64]uint64, map[string]map[string]map[string]string) ([]indexer.TransactionLabel, error)
ChainType() string
}

func CrawlEntireBlocks(client BlockchainClient, startBlock *big.Int, endBlock *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
blocks, blocksIndex, txsIndex, eventsIndex, pBlockErr := client.FetchAsProtoBlocksWithEvents(startBlock, endBlock, debug, maxRequests)
func CrawlEntireBlocks(client BlockchainClient, startBlock *big.Int, endBlock *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, blocksIndex, txsIndex, eventsIndex, blocksSize, pBlockErr := client.FetchAsProtoBlocksWithEvents(startBlock, endBlock, debug, maxRequests)
if pBlockErr != nil {
return nil, nil, nil, nil, pBlockErr
return nil, nil, nil, nil, 0, pBlockErr
}

return blocks, blocksIndex, txsIndex, eventsIndex, nil
return blocks, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func DecodeTransactionInputData(contractABI *abi.ABI, data []byte) {
Expand Down
11 changes: 7 additions & 4 deletions blockchain/mantle/mantle.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
0,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
11 changes: 7 additions & 4 deletions blockchain/mantle_sepolia/mantle_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
0,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
11 changes: 7 additions & 4 deletions blockchain/polygon/polygon.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
0,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
11 changes: 7 additions & 4 deletions blockchain/xai/xai.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ func (c *Client) ParseEvents(from, to *big.Int, blocksCache map[uint64]indexer.B
return parsedEvents, eventsIndex, nil
}

func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, error) {
func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, maxRequests int) ([]proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, []indexer.LogIndex, uint64, error) {
blocks, err := c.ParseBlocksWithTransactions(from, to, debug, maxRequests)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksSize uint64

blocksCache := make(map[uint64]indexer.BlockCache)

for _, block := range blocks {
Expand All @@ -361,7 +363,7 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max

events, eventsIndex, err := c.ParseEvents(from, to, blocksCache, debug)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, 0, err
}

var blocksProto []proto.Message
Expand Down Expand Up @@ -409,10 +411,11 @@ func (c *Client) FetchAsProtoBlocksWithEvents(from, to *big.Int, debug bool, max
block.L1BlockNumber,
))

blocksSize += uint64(proto.Size(block))
blocksProto = append(blocksProto, block) // Assuming block is already a proto.Message
}

return blocksProto, blocksIndex, txsIndex, eventsIndex, nil
return blocksProto, blocksIndex, txsIndex, eventsIndex, blocksSize, nil
}

func (c *Client) ProcessBlocksToBatch(msgs []proto.Message) (proto.Message, error) {
Expand Down
Loading

0 comments on commit 4418236

Please sign in to comment.