diff --git a/cmd.go b/cmd.go index 6b50602..37e4700 100644 --- a/cmd.go +++ b/cmd.go @@ -278,7 +278,7 @@ func CreateCrawlerCommand() *cobra.Command { crawlerCmd.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')") crawlerCmd.Flags().BoolVar(&force, "force", false, "Set this flag to force the crawler start from the specified block, otherwise it checks database latest indexed block number (default: false)") crawlerCmd.Flags().IntVar(&protoSizeLimit, "proto-size-limit", 100, "Proto file size limit in Mb (default: 100Mb)") - crawlerCmd.Flags().IntVar(&protoTimeLimit, "proto-time-limit", 5, "Proto time limit in minutes (default: 5min)") + crawlerCmd.Flags().IntVar(&protoTimeLimit, "proto-time-limit", 300, "Proto time limit in seconds (default: 300sec)") return crawlerCmd } diff --git a/crawler/crawler.go b/crawler/crawler.go index 72cecb0..c4f3a83 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -116,10 +116,54 @@ type BlocksBufferBatch struct { Buffer bytes.Buffer } +func (c *Crawler) PushPackOfData(blocksBufferPack *bytes.Buffer, blocksIndexPack []indexer.BlockIndex, txsIndexPack []indexer.TransactionIndex, eventsIndexPack []indexer.LogIndex, packStartBlock, packEndBlock int64) error { + packRange := fmt.Sprintf("%d-%d", packStartBlock, packEndBlock) + + // Save proto data + if err := c.StorageInstance.Save(packRange, "data.proto", *blocksBufferPack); err != nil { + return fmt.Errorf("failed to save data.proto: %w", err) + } + log.Printf("Saved .proto blocks with transactions and events to %s", packRange) + + // Save indexes data + var interfaceBlocksIndexPack []interface{} + for _, v := range blocksIndexPack { + v.Path = filepath.Join(c.basePath, packRange, "data.proto") + interfaceBlocksIndexPack = append(interfaceBlocksIndexPack, v) + } + + var interfaceTxsIndexPack []interface{} + for _, v := range txsIndexPack { + v.Path = filepath.Join(c.basePath, packRange, "data.proto") + interfaceTxsIndexPack = append(interfaceTxsIndexPack, v) + } + + var interfaceEventsIndexPack []interface{} + for _, v := range eventsIndexPack { + v.Path = filepath.Join(c.basePath, packRange, "data.proto") + interfaceEventsIndexPack = append(interfaceEventsIndexPack, v) + } + + // TODO: Unite in one commit + if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceBlocksIndexPack, "block"); err != nil { + return fmt.Errorf("failed to write block index to database: %w", err) + } + + if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceTxsIndexPack, "transaction"); err != nil { + return fmt.Errorf("failed to write transaction index to database: %w", err) + } + + if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceEventsIndexPack, "log"); err != nil { + return fmt.Errorf("failed to write event index to database: %w", err) + } + + return nil +} + // Start initiates the crawling process for the configured blockchain. func (c *Crawler) Start(threads int) { - // protoBufferSizeLimit := c.protoSizeLimit * 1024 * 1024 // In Mb - // protoDurationTimeLimit := time.Duration(c.protoTimeLimit) * time.Minute + protoBufferSizeLimit := c.protoSizeLimit * 1024 * 1024 // In Mb + protoDurationTimeLimit := time.Duration(c.protoTimeLimit) * time.Second batchSize := int64(10) @@ -148,8 +192,13 @@ func (c *Crawler) Start(threads int) { } } - // var blocksBufferBatch BlocksBufferBatch - // blocksBufferBatchEncoder := json.NewEncoder(&blocksBufferBatch.Buffer) + // Variables to accumulate packs before write + var blocksBufferPack bytes.Buffer + packCrawlStartTs := time.Now() + var blocksIndexPack []indexer.BlockIndex + var txsIndexPack []indexer.TransactionIndex + var eventsIndexPack []indexer.LogIndex + packStartBlock := c.startBlock tempEndBlock := c.startBlock + batchSize var safeBlock int64 @@ -201,8 +250,7 @@ func (c *Crawler) Start(threads int) { // Retry the operation in case of failure with cumulative attempts err = retryOperation(retryAttempts, retryWaitTime, func() error { - batchDir := fmt.Sprintf("%d-%d", c.startBlock, tempEndBlock) - log.Printf("Operates with batch of blocks: %s", batchDir) + log.Printf("Operates with batch of blocks: %d-%d", c.startBlock, tempEndBlock) // Fetch blocks with transactions blocks, blocksIndex, txsIndex, eventsIndex, crawlErr := seer_blockchain.CrawlEntireBlocks(c.Client, big.NewInt(c.startBlock), big.NewInt(tempEndBlock), SEER_CRAWLER_DEBUG, threads) @@ -210,46 +258,24 @@ func (c *Crawler) Start(threads int) { return fmt.Errorf("failed to crawl blocks, txs and events: %w", err) } - // Save .proto data - var blocksBuffer bytes.Buffer + // Append .proto data to buffer for _, block := range blocks { - protodelim.MarshalTo(&blocksBuffer, block) - } - if err := c.StorageInstance.Save(batchDir, "data.proto", blocksBuffer); err != nil { - return fmt.Errorf("failed to save data.proto: %w", err) - } - log.Printf("Saved .proto blocks with transactions and events to %s", batchDir) - - // Save indexes data - interfaceBlockIndex := make([]interface{}, len(blocksIndex)) - for i, v := range blocksIndex { - v.Path = filepath.Join(c.basePath, batchDir, "data.proto") - interfaceBlockIndex[i] = v + protodelim.MarshalTo(&blocksBufferPack, block) } - interfaceTransactionIndex := make([]interface{}, len(txsIndex)) - for i, v := range txsIndex { - v.Path = filepath.Join(c.basePath, batchDir, "data.proto") - interfaceTransactionIndex[i] = v - } - - interfaceEventsIndex := make([]interface{}, len(eventsIndex)) - for i, v := range eventsIndex { - v.Path = filepath.Join(c.basePath, batchDir, "data.proto") - interfaceEventsIndex[i] = v - } + blocksIndexPack = append(blocksIndexPack, blocksIndex...) + txsIndexPack = append(txsIndexPack, txsIndex...) + eventsIndexPack = append(eventsIndexPack, eventsIndex...) - // TODO: Unite in one commit - if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceBlockIndex, "block"); err != nil { - return fmt.Errorf("failed to write block index to database: %w", err) - } + if blocksBufferPack.Len() > protoBufferSizeLimit || packCrawlStartTs.Add(protoDurationTimeLimit).Before(time.Now()) { + if pushEr := c.PushPackOfData(&blocksBufferPack, blocksIndexPack, txsIndexPack, eventsIndexPack, packStartBlock, tempEndBlock); err != nil { + return fmt.Errorf("unable to push data correctly: %w", pushEr) + } - if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceTransactionIndex, "transaction"); err != nil { - return fmt.Errorf("failed to write transaction index to database: %w", err) - } + blocksBufferPack.Reset() - if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceEventsIndex, "log"); err != nil { - return fmt.Errorf("failed to write event index to database: %w", err) + packStartBlock = tempEndBlock + 1 + packCrawlStartTs = time.Now() } return nil @@ -264,6 +290,17 @@ func (c *Crawler) Start(threads int) { c.startBlock = tempEndBlock + 1 } + + if blocksBufferPack.Len() != 0 || packCrawlStartTs.Add(protoDurationTimeLimit).Before(time.Now()) { + if pushEr := c.PushPackOfData(&blocksBufferPack, blocksIndexPack, txsIndexPack, eventsIndexPack, packStartBlock, tempEndBlock); err != nil { + log.Printf("Unable to push last pack of data correctly, err: %v", pushEr) + } + + blocksBufferPack.Reset() + + packStartBlock = tempEndBlock + 1 + packCrawlStartTs = time.Now() + } } // TODO: methods here for additional functionalities