Skip to content

Commit

Permalink
Merge pull request #41 from moonstream-to/batch-commit
Browse files Browse the repository at this point in the history
Batch commit.
  • Loading branch information
Andrei-Dolgolev authored Jul 17, 2024
2 parents 282fb88 + 75b2b17 commit 36007ad
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 93 deletions.
20 changes: 7 additions & 13 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,29 @@ func (c *Crawler) PushPackOfData(blocksBufferPack *bytes.Buffer, blocksIndexPack
log.Printf("Saved .proto blocks with transactions and events to %s", packRange)

// Save indexes data
var interfaceBlocksIndexPack []interface{}
var interfaceBlocksIndexPack []indexer.BlockIndex
for _, v := range blocksIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceBlocksIndexPack = append(interfaceBlocksIndexPack, v)
}

var interfaceTxsIndexPack []interface{}
var interfaceTxsIndexPack []indexer.TransactionIndex
for _, v := range txsIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceTxsIndexPack = append(interfaceTxsIndexPack, v)
}

var interfaceEventsIndexPack []interface{}
var interfaceEventsIndexPack []indexer.LogIndex
for _, v := range eventsIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceEventsIndexPack = append(interfaceEventsIndexPack, v)
}

// TODO: Unite in one commit
if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceBlocksIndexPack, "block"); err != nil {
return fmt.Errorf("failed to write block index to database: %w", err)
}

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceTxsIndexPack, "transaction"); err != nil {
return fmt.Errorf("failed to write transaction index to database: %w", err)
}
// Write indexes to database
err := indexer.WriteIndicesToDatabase(c.blockchain, interfaceBlocksIndexPack, interfaceTxsIndexPack, interfaceEventsIndexPack)

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceEventsIndexPack, "log"); err != nil {
return fmt.Errorf("failed to write event index to database: %w", err)
if err != nil {
return fmt.Errorf("failed to write indices to database: %w", err)
}

return nil
Expand Down
120 changes: 104 additions & 16 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ func updateValues(valuesMap map[string]UnnestInsertValueStruct, key string, valu
valuesMap[key] = tmp
}

// Batch insert
func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string, columns []string, values map[string]UnnestInsertValueStruct, conflictClause string) error {
func (p *PostgreSQLpgx) WriteIndexes(blockchain string, blocksIndexPack []BlockIndex, transactionsIndexPack []TransactionIndex, logsIndexPack []LogIndex) error {

ctx := context.Background()
pool := p.GetPool()
conn, err := pool.Acquire(ctx)
if err != nil {
Expand All @@ -314,6 +315,36 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string
}
}()

// Write blocks index
if len(blocksIndexPack) > 0 {
err = p.writeBlockIndexToDB(tx, blockchain, blocksIndexPack)
if err != nil {
return err
}
}

// Write transactions index
if len(transactionsIndexPack) > 0 {
err = p.writeTransactionIndexToDB(tx, blockchain, transactionsIndexPack)
if err != nil {
return err
}
}

// Write logs index
if len(logsIndexPack) > 0 {
err = p.writeLogIndexToDB(tx, blockchain, logsIndexPack)
if err != nil {
return err
}
}

return nil
}

// Batch insert
func (p *PostgreSQLpgx) executeBatchInsert(tx pgx.Tx, ctx context.Context, tableName string, columns []string, values map[string]UnnestInsertValueStruct, conflictClause string) error {

types := make([]string, 0)

for index, column := range columns {
Expand All @@ -331,6 +362,8 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string
valuesSlice = append(valuesSlice, values[column].Values)
}

// track execution time

if _, err := tx.Exec(ctx, query, valuesSlice...); err != nil {
fmt.Println("Error executing bulk insert", err)
return fmt.Errorf("error executing bulk insert for batch: %w", err)
Expand All @@ -339,8 +372,8 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string
return nil
}

func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIndex) error {
tableName := blockchain + "_blocks"
func (p *PostgreSQLpgx) writeBlockIndexToDB(tx pgx.Tx, blockchain string, indexes []BlockIndex) error {
tableName := BlocksTableName(blockchain)
isBlockchainWithL1Chain := IsBlockchainWithL1Chain(blockchain)
columns := []string{"block_number", "block_hash", "block_timestamp", "parent_hash", "row_id", "path"}

Expand Down Expand Up @@ -399,18 +432,20 @@ func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIn
}

ctx := context.Background()
err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (block_number) DO NOTHING")
err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (block_number) DO NOTHING")

if err != nil {
return err
}

log.Printf("Saved %d records into %s table", len(indexes), tableName)
log.Printf("Add %d records to transaction into %s table", len(indexes), tableName)

return nil
}

func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []TransactionIndex) error {
func (p *PostgreSQLpgx) writeTransactionIndexToDB(tx pgx.Tx, blockchain string, indexes []TransactionIndex) error {

tableName := TransactionsTableName(blockchain)

columns := []string{"block_number", "block_hash", "hash", "index", "type", "from_address", "to_address", "selector", "row_id", "path"}
var valuesMap = make(map[string]UnnestInsertValueStruct)
Expand Down Expand Up @@ -495,18 +530,20 @@ func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []Tr

ctx := context.Background()

err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (hash) DO NOTHING")
err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (hash) DO NOTHING")

if err != nil {
return err
}

log.Printf("Saved %d records into %s table", len(indexes), tableName)
log.Printf("Add %d records to transaction into %s table", len(indexes), tableName)

return nil
}

func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex) error {
func (p *PostgreSQLpgx) writeLogIndexToDB(tx pgx.Tx, blockchain string, indexes []LogIndex) error {

tableName := LogsTableName(blockchain)

columns := []string{"transaction_hash", "block_hash", "address", "selector", "topic1", "topic2", "topic3", "row_id", "log_index", "path"}

Expand Down Expand Up @@ -584,13 +621,14 @@ func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex)
}

ctx := context.Background()
err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING")

err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING")

if err != nil {
return err
}

log.Printf("Saved %d records into %s table", len(indexes), tableName)
log.Printf("Add %d records to transaction into %s table", len(indexes), tableName)

return nil
}
Expand Down Expand Up @@ -1027,7 +1065,57 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock

}

func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) error {
func (p *PostgreSQLpgx) WriteLabes(
blockchain string,
transactions []TransactionLabel,
events []EventLabel,
) error {

pool := p.GetPool()

conn, err := pool.Acquire(context.Background())

if err != nil {
return err
}

defer conn.Release()

tx, err := conn.Begin(context.Background())

if err != nil {
return err
}

defer func() {
if err := recover(); err != nil {
tx.Rollback(context.Background())
panic(err)
} else if err != nil {
tx.Rollback(context.Background())
} else {
err = tx.Commit(context.Background())
}
}()

if len(transactions) > 0 {
err := p.WriteTransactions(tx, blockchain, transactions)
if err != nil {
return err
}
}

if len(events) > 0 {
err := p.WriteEvents(tx, blockchain, events)
if err != nil {
return err
}
}

return nil
}

func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error {

tableName := LabelsTableName(blockchain)
columns := []string{"id", "label", "transaction_hash", "log_index", "block_number", "block_hash", "block_timestamp", "caller_address", "origin_address", "address", "label_name", "label_type", "label_data"}
Expand Down Expand Up @@ -1138,7 +1226,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro

ctx := context.Background()

err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING")
err := p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING")

if err != nil {
return err
Expand All @@ -1149,7 +1237,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro
return nil
}

func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []TransactionLabel) error {
func (p *PostgreSQLpgx) WriteTransactions(tx pgx.Tx, blockchain string, transactions []TransactionLabel) error {
tableName := LabelsTableName(blockchain)
columns := []string{"id", "address", "block_number", "block_hash", "caller_address", "label_name", "label_type", "origin_address", "label", "transaction_hash", "label_data", "block_timestamp"}

Expand Down Expand Up @@ -1254,7 +1342,7 @@ func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []Tran

ctx := context.Background()

err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash) WHERE label='seer' AND label_type='tx_call' DO NOTHING")
err := p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash) WHERE label='seer' AND label_type='tx_call' DO NOTHING")

if err != nil {
return err
Expand Down
51 changes: 4 additions & 47 deletions indexer/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package indexer

import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -146,51 +145,9 @@ func filterDuplicates(indexType string, existingIndices, newIndices []interface{
return nil
}

func WriteIndexesToDatabase(blockchain string, indexes []interface{}, indexType string) error {
// Implement logic to write indexes to the database
// WriteIndicesToDatabase writes the given indices to the database
func WriteIndicesToDatabase(blockchain string, blocks []BlockIndex, transactions []TransactionIndex, logs []LogIndex) error {
// Write block indices

if len(indexes) == 0 {
return nil
}

switch indexType {
case "block":
var blockIndexes []BlockIndex
for _, i := range indexes {
index, ok := i.(BlockIndex)
if !ok {
return errors.New("invalid type for block index")
}
index.chain = blockchain
blockIndexes = append(blockIndexes, index)
}
return DBConnection.writeBlockIndexToDB(blockchain, blockIndexes)

case "transaction":
var transactionIndexes []TransactionIndex
for _, i := range indexes {
index, ok := i.(TransactionIndex)
if !ok {
return errors.New("invalid type for transaction index")
}
index.chain = blockchain
transactionIndexes = append(transactionIndexes, index)
}
return DBConnection.writeTransactionIndexToDB(blockchain+"_transactions", transactionIndexes)

case "log":
var logIndexes []LogIndex
for _, i := range indexes {
index, ok := i.(LogIndex)
if !ok {
return errors.New("invalid type for log index")
}
index.chain = blockchain
logIndexes = append(logIndexes, index)
}
return DBConnection.writeLogIndexToDB(blockchain+"_logs", logIndexes)

default:
return errors.New("unsupported index type")
}
return DBConnection.WriteIndexes(blockchain, blocks, transactions, logs)
}
18 changes: 1 addition & 17 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,23 +399,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
decodedTransactionsPack = append(decodedTransactionsPack, decodedTransactions...)
}

if len(decodedEventsPack) > 0 {
// Write events to user RDS
customer.Pgx.WriteEvents(
d.blockchain,
decodedEventsPack,
)
}

// // Transactions

// Write transactions to user RDS
if len(decodedTransactionsPack) > 0 {
customer.Pgx.WriteTransactions(
d.blockchain,
decodedTransactionsPack,
)
}
customer.Pgx.WriteLabes(d.blockchain, decodedTransactionsPack, decodedEventsPack)

<-sem
}(update)
Expand Down

0 comments on commit 36007ad

Please sign in to comment.