Skip to content

Commit

Permalink
Time and buffer size logic for crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
kompotkot committed Jul 3, 2024
1 parent bac324c commit 22be85b
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func CreateCrawlerCommand() *cobra.Command {
crawlerCmd.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')")
crawlerCmd.Flags().BoolVar(&force, "force", false, "Set this flag to force the crawler start from the specified block, otherwise it checks database latest indexed block number (default: false)")
crawlerCmd.Flags().IntVar(&protoSizeLimit, "proto-size-limit", 100, "Proto file size limit in Mb (default: 100Mb)")
crawlerCmd.Flags().IntVar(&protoTimeLimit, "proto-time-limit", 5, "Proto time limit in minutes (default: 5min)")
crawlerCmd.Flags().IntVar(&protoTimeLimit, "proto-time-limit", 300, "Proto time limit in seconds (default: 300sec)")

return crawlerCmd
}
Expand Down
117 changes: 77 additions & 40 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,54 @@ type BlocksBufferBatch struct {
Buffer bytes.Buffer
}

func (c *Crawler) PushPackOfData(blocksBufferPack *bytes.Buffer, blocksIndexPack []indexer.BlockIndex, txsIndexPack []indexer.TransactionIndex, eventsIndexPack []indexer.LogIndex, packStartBlock, packEndBlock int64) error {
packRange := fmt.Sprintf("%d-%d", packStartBlock, packEndBlock)

// Save proto data
if err := c.StorageInstance.Save(packRange, "data.proto", *blocksBufferPack); err != nil {
return fmt.Errorf("failed to save data.proto: %w", err)
}
log.Printf("Saved .proto blocks with transactions and events to %s", packRange)

// Save indexes data
var interfaceBlocksIndexPack []interface{}
for _, v := range blocksIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceBlocksIndexPack = append(interfaceBlocksIndexPack, v)
}

var interfaceTxsIndexPack []interface{}
for _, v := range txsIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceTxsIndexPack = append(interfaceTxsIndexPack, v)
}

var interfaceEventsIndexPack []interface{}
for _, v := range eventsIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceEventsIndexPack = append(interfaceEventsIndexPack, v)
}

// TODO: Unite in one commit
if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceBlocksIndexPack, "block"); err != nil {
return fmt.Errorf("failed to write block index to database: %w", err)
}

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceTxsIndexPack, "transaction"); err != nil {
return fmt.Errorf("failed to write transaction index to database: %w", err)
}

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceEventsIndexPack, "log"); err != nil {
return fmt.Errorf("failed to write event index to database: %w", err)
}

return nil
}

// Start initiates the crawling process for the configured blockchain.
func (c *Crawler) Start(threads int) {
// protoBufferSizeLimit := c.protoSizeLimit * 1024 * 1024 // In Mb
// protoDurationTimeLimit := time.Duration(c.protoTimeLimit) * time.Minute
protoBufferSizeLimit := c.protoSizeLimit * 1024 * 1024 // In Mb
protoDurationTimeLimit := time.Duration(c.protoTimeLimit) * time.Second

batchSize := int64(10)

Expand Down Expand Up @@ -148,8 +192,13 @@ func (c *Crawler) Start(threads int) {
}
}

// var blocksBufferBatch BlocksBufferBatch
// blocksBufferBatchEncoder := json.NewEncoder(&blocksBufferBatch.Buffer)
// Variables to accumulate packs before write
var blocksBufferPack bytes.Buffer
packCrawlStartTs := time.Now()
var blocksIndexPack []indexer.BlockIndex
var txsIndexPack []indexer.TransactionIndex
var eventsIndexPack []indexer.LogIndex
packStartBlock := c.startBlock

tempEndBlock := c.startBlock + batchSize
var safeBlock int64
Expand Down Expand Up @@ -201,55 +250,32 @@ func (c *Crawler) Start(threads int) {

// Retry the operation in case of failure with cumulative attempts
err = retryOperation(retryAttempts, retryWaitTime, func() error {
batchDir := fmt.Sprintf("%d-%d", c.startBlock, tempEndBlock)
log.Printf("Operates with batch of blocks: %s", batchDir)
log.Printf("Operates with batch of blocks: %d-%d", c.startBlock, tempEndBlock)

// Fetch blocks with transactions
blocks, blocksIndex, txsIndex, eventsIndex, crawlErr := seer_blockchain.CrawlEntireBlocks(c.Client, big.NewInt(c.startBlock), big.NewInt(tempEndBlock), SEER_CRAWLER_DEBUG, threads)
if crawlErr != nil {
return fmt.Errorf("failed to crawl blocks, txs and events: %w", err)
}

// Save .proto data
var blocksBuffer bytes.Buffer
// Append .proto data to buffer
for _, block := range blocks {
protodelim.MarshalTo(&blocksBuffer, block)
}
if err := c.StorageInstance.Save(batchDir, "data.proto", blocksBuffer); err != nil {
return fmt.Errorf("failed to save data.proto: %w", err)
}
log.Printf("Saved .proto blocks with transactions and events to %s", batchDir)

// Save indexes data
interfaceBlockIndex := make([]interface{}, len(blocksIndex))
for i, v := range blocksIndex {
v.Path = filepath.Join(c.basePath, batchDir, "data.proto")
interfaceBlockIndex[i] = v
protodelim.MarshalTo(&blocksBufferPack, block)
}

interfaceTransactionIndex := make([]interface{}, len(txsIndex))
for i, v := range txsIndex {
v.Path = filepath.Join(c.basePath, batchDir, "data.proto")
interfaceTransactionIndex[i] = v
}

interfaceEventsIndex := make([]interface{}, len(eventsIndex))
for i, v := range eventsIndex {
v.Path = filepath.Join(c.basePath, batchDir, "data.proto")
interfaceEventsIndex[i] = v
}
blocksIndexPack = append(blocksIndexPack, blocksIndex...)
txsIndexPack = append(txsIndexPack, txsIndex...)
eventsIndexPack = append(eventsIndexPack, eventsIndex...)

// TODO: Unite in one commit
if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceBlockIndex, "block"); err != nil {
return fmt.Errorf("failed to write block index to database: %w", err)
}
if blocksBufferPack.Len() > protoBufferSizeLimit || packCrawlStartTs.Add(protoDurationTimeLimit).Before(time.Now()) {
if pushEr := c.PushPackOfData(&blocksBufferPack, blocksIndexPack, txsIndexPack, eventsIndexPack, packStartBlock, tempEndBlock); err != nil {
return fmt.Errorf("unable to push data correctly: %w", pushEr)
}

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceTransactionIndex, "transaction"); err != nil {
return fmt.Errorf("failed to write transaction index to database: %w", err)
}
blocksBufferPack.Reset()

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceEventsIndex, "log"); err != nil {
return fmt.Errorf("failed to write event index to database: %w", err)
packStartBlock = tempEndBlock + 1
packCrawlStartTs = time.Now()
}

return nil
Expand All @@ -264,6 +290,17 @@ func (c *Crawler) Start(threads int) {

c.startBlock = tempEndBlock + 1
}

if blocksBufferPack.Len() != 0 || packCrawlStartTs.Add(protoDurationTimeLimit).Before(time.Now()) {
if pushEr := c.PushPackOfData(&blocksBufferPack, blocksIndexPack, txsIndexPack, eventsIndexPack, packStartBlock, tempEndBlock); err != nil {
log.Printf("Unable to push last pack of data correctly, err: %v", pushEr)
}

blocksBufferPack.Reset()

packStartBlock = tempEndBlock + 1
packCrawlStartTs = time.Now()
}
}

// TODO: methods here for additional functionalities
Expand Down

0 comments on commit 22be85b

Please sign in to comment.