Skip to content

Commit

Permalink
feat(backup): add batch feature to cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
aeddi committed Sep 10, 2024
1 parent 32b37aa commit de25c21
Showing 1 changed file with 90 additions and 45 deletions.
135 changes: 90 additions & 45 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/gnolang/tx-archive/backup/writer"
"github.com/gnolang/tx-archive/log"
"github.com/gnolang/tx-archive/log/noop"
"github.com/gnolang/tx-archive/types"

_ "github.com/gnolang/gno/gno.land/pkg/sdk/vm"
)
Expand Down Expand Up @@ -59,60 +58,110 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
return fmt.Errorf("unable to determine right bound, %w", boundErr)
}

// Keep track of total txs backed up
totalTxs := uint64(0)
// Log info about what will be backed up
s.logger.Info(
"Existing blocks to backup",
"from block", cfg.FromBlock,
"to block", toBlock,
"total", toBlock-cfg.FromBlock+1,
)

// Keep track of what has been backed up
var results struct {
blocksFetched uint64
blocksWithTxs uint64
txsBackedUp uint64
}

fetchAndWrite := func(block uint64) error {
txs, txErr := s.client.GetBlockTransactions(block)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
}
// Log results on exit
defer func() {
s.logger.Info(
"Total data backed up",
"blocks fetched", results.blocksFetched,
"blocks with transactions", results.blocksWithTxs,
"transactions written", results.txsBackedUp,
)
}()

// Internal function that fetches and writes a range of blocks
fetchAndWrite := func(fromBlock, toBlock uint64) error {
// Fetch by batches
for batchStart := fromBlock; batchStart <= toBlock; {
// Determine batch stop block
batchStop := batchStart + uint64(s.batchSize) - 1
if batchStop > toBlock {
batchStop = toBlock
}

// Skip empty blocks
if len(txs) == 0 {
return nil
}
batchSize := batchStop - batchStart + 1

// Save the block transaction data, if any
for _, tx := range txs {
data := &types.TxData{
Tx: tx,
BlockNum: block,
}
// Verbose log for blocks to be fetched
s.logger.Debug(
"Fetching batch of blocks",
"from", batchStart,
"to", batchStop,
"size", batchSize,
)

// Write the tx data to the file
if writeErr := s.writer.WriteTxData(data); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
// Fetch current batch
blocksTxs, txErr := s.client.GetBlocksTransactions(ctx, batchStart, batchStop)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
}

totalTxs++
// Keep track of the number of fetched blocks & those containing transactions
results.blocksFetched += batchSize
results.blocksWithTxs += uint64(len(blocksTxs))

// Log the progress
s.logger.Info(
"Transaction backed up",
"total", totalTxs,
// Verbose log for blocks containing transactions
s.logger.Debug(
"Batch fetched successfully",
"blocks with transactions", fmt.Sprintf("%d/%d", len(blocksTxs), batchSize),
)
}

return nil
}
// Iterate over the list of blocks containing transactions
for _, txs := range blocksTxs {
for i, tx := range txs {
// Write the tx data to the file
if writeErr := s.writer.WriteTxData(tx); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
}

// Gather the chain data from the node
for block := cfg.FromBlock; block <= toBlock; block++ {
select {
case <-ctx.Done():
s.logger.Info("backup procedure stopped")
// Keep track of the number of backed up transactions
results.txsBackedUp++

return nil
default:
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
// Verbose log for each transaction written
s.logger.Debug(
"Transaction backed up",
"blockNum", tx.BlockNum,
"tx count (block)", i+1,
"tx count (total)", results.txsBackedUp,
)
}
}

batchStart = batchStop + 1
}

return nil
}

// Backup the existing transactions
if fetchErr := fetchAndWrite(cfg.FromBlock, toBlock); fetchErr != nil {
return fetchErr
}

// Check if there needs to be a watcher setup
if cfg.Watch {
s.logger.Info(
"Existing blocks backup complete",
"blocks fetched", results.blocksFetched,
"blocks with transactions", results.blocksWithTxs,
"transactions written", results.txsBackedUp,
)

s.logger.Info("Watch for new blocks to backup")

ticker := time.NewTicker(s.watchInterval)
defer ticker.Stop()

Expand All @@ -121,7 +170,7 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
for {
select {
case <-ctx.Done():
s.logger.Info("export procedure stopped")
s.logger.Info("Stop watching for new blocks to backup")

return nil
case <-ticker.C:
Expand All @@ -137,10 +186,8 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
}

// Catch up to the latest block
for block := lastBlock + 1; block <= latest; block++ {
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
}
if fetchErr := fetchAndWrite(lastBlock+1, latest); fetchErr != nil {
return fetchErr
}

// Update the last exported block
Expand All @@ -149,8 +196,6 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
}
}

s.logger.Info("Backup complete")

return nil
}

Expand Down

0 comments on commit de25c21

Please sign in to comment.