Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch commit. #41

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,29 @@ func (c *Crawler) PushPackOfData(blocksBufferPack *bytes.Buffer, blocksIndexPack
log.Printf("Saved .proto blocks with transactions and events to %s", packRange)

// Save indexes data
var interfaceBlocksIndexPack []interface{}
var interfaceBlocksIndexPack []indexer.BlockIndex
for _, v := range blocksIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceBlocksIndexPack = append(interfaceBlocksIndexPack, v)
}

var interfaceTxsIndexPack []interface{}
var interfaceTxsIndexPack []indexer.TransactionIndex
for _, v := range txsIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceTxsIndexPack = append(interfaceTxsIndexPack, v)
}

var interfaceEventsIndexPack []interface{}
var interfaceEventsIndexPack []indexer.LogIndex
for _, v := range eventsIndexPack {
v.Path = filepath.Join(c.basePath, packRange, "data.proto")
interfaceEventsIndexPack = append(interfaceEventsIndexPack, v)
}

// TODO: Unite in one commit
if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceBlocksIndexPack, "block"); err != nil {
return fmt.Errorf("failed to write block index to database: %w", err)
}

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceTxsIndexPack, "transaction"); err != nil {
return fmt.Errorf("failed to write transaction index to database: %w", err)
}
// Write indexes to database
err := indexer.WriteIndicesToDatabase(c.blockchain, interfaceBlocksIndexPack, interfaceTxsIndexPack, interfaceEventsIndexPack)

if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceEventsIndexPack, "log"); err != nil {
return fmt.Errorf("failed to write event index to database: %w", err)
if err != nil {
return fmt.Errorf("failed to write indices to database: %w", err)
}

return nil
Expand Down
120 changes: 104 additions & 16 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ func updateValues(valuesMap map[string]UnnestInsertValueStruct, key string, valu
valuesMap[key] = tmp
}

// Batch insert
func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string, columns []string, values map[string]UnnestInsertValueStruct, conflictClause string) error {
func (p *PostgreSQLpgx) WriteIndexes(blockchain string, blocksIndexPack []BlockIndex, transactionsIndexPack []TransactionIndex, logsIndexPack []LogIndex) error {

ctx := context.Background()
pool := p.GetPool()
conn, err := pool.Acquire(ctx)
if err != nil {
Expand All @@ -314,6 +315,36 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string
}
}()

// Write blocks index
if len(blocksIndexPack) > 0 {
err = p.writeBlockIndexToDB(tx, blockchain, blocksIndexPack)
if err != nil {
return err
}
}

// Write transactions index
if len(transactionsIndexPack) > 0 {
err = p.writeTransactionIndexToDB(tx, blockchain, transactionsIndexPack)
if err != nil {
return err
}
}

// Write logs index
if len(logsIndexPack) > 0 {
err = p.writeLogIndexToDB(tx, blockchain, logsIndexPack)
if err != nil {
return err
}
}

return nil
}

// Batch insert
func (p *PostgreSQLpgx) executeBatchInsert(tx pgx.Tx, ctx context.Context, tableName string, columns []string, values map[string]UnnestInsertValueStruct, conflictClause string) error {

types := make([]string, 0)

for index, column := range columns {
Expand All @@ -331,6 +362,8 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string
valuesSlice = append(valuesSlice, values[column].Values)
}

// track execution time

if _, err := tx.Exec(ctx, query, valuesSlice...); err != nil {
fmt.Println("Error executing bulk insert", err)
return fmt.Errorf("error executing bulk insert for batch: %w", err)
Expand All @@ -339,8 +372,8 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string
return nil
}

func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIndex) error {
tableName := blockchain + "_blocks"
func (p *PostgreSQLpgx) writeBlockIndexToDB(tx pgx.Tx, blockchain string, indexes []BlockIndex) error {
tableName := BlocksTableName(blockchain)
isBlockchainWithL1Chain := IsBlockchainWithL1Chain(blockchain)
columns := []string{"block_number", "block_hash", "block_timestamp", "parent_hash", "row_id", "path"}

Expand Down Expand Up @@ -399,18 +432,20 @@ func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIn
}

ctx := context.Background()
err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (block_number) DO NOTHING")
err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (block_number) DO NOTHING")

if err != nil {
return err
}

log.Printf("Saved %d records into %s table", len(indexes), tableName)
log.Printf("Add %d records to transaction into %s table", len(indexes), tableName)

return nil
}

func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []TransactionIndex) error {
func (p *PostgreSQLpgx) writeTransactionIndexToDB(tx pgx.Tx, blockchain string, indexes []TransactionIndex) error {

tableName := TransactionsTableName(blockchain)

columns := []string{"block_number", "block_hash", "hash", "index", "type", "from_address", "to_address", "selector", "row_id", "path"}
var valuesMap = make(map[string]UnnestInsertValueStruct)
Expand Down Expand Up @@ -495,18 +530,20 @@ func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []Tr

ctx := context.Background()

err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (hash) DO NOTHING")
err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (hash) DO NOTHING")

if err != nil {
return err
}

log.Printf("Saved %d records into %s table", len(indexes), tableName)
log.Printf("Add %d records to transaction into %s table", len(indexes), tableName)

return nil
}

func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex) error {
func (p *PostgreSQLpgx) writeLogIndexToDB(tx pgx.Tx, blockchain string, indexes []LogIndex) error {

tableName := LogsTableName(blockchain)

columns := []string{"transaction_hash", "block_hash", "address", "selector", "topic1", "topic2", "topic3", "row_id", "log_index", "path"}

Expand Down Expand Up @@ -584,13 +621,14 @@ func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex)
}

ctx := context.Background()
err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING")

err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING")

if err != nil {
return err
}

log.Printf("Saved %d records into %s table", len(indexes), tableName)
log.Printf("Add %d records to transaction into %s table", len(indexes), tableName)

return nil
}
Expand Down Expand Up @@ -1027,7 +1065,57 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock

}

func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) error {
func (p *PostgreSQLpgx) WriteLabes(
blockchain string,
transactions []TransactionLabel,
events []EventLabel,
) error {

pool := p.GetPool()

conn, err := pool.Acquire(context.Background())

if err != nil {
return err
}

defer conn.Release()

tx, err := conn.Begin(context.Background())

if err != nil {
return err
}

defer func() {
if err := recover(); err != nil {
tx.Rollback(context.Background())
panic(err)
} else if err != nil {
tx.Rollback(context.Background())
} else {
err = tx.Commit(context.Background())
}
}()

if len(transactions) > 0 {
err := p.WriteTransactions(tx, blockchain, transactions)
if err != nil {
return err
}
}

if len(events) > 0 {
err := p.WriteEvents(tx, blockchain, events)
if err != nil {
return err
}
}

return nil
}

func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error {

tableName := LabelsTableName(blockchain)
columns := []string{"id", "label", "transaction_hash", "log_index", "block_number", "block_hash", "block_timestamp", "caller_address", "origin_address", "address", "label_name", "label_type", "label_data"}
Expand Down Expand Up @@ -1138,7 +1226,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro

ctx := context.Background()

err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING")
err := p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING")

if err != nil {
return err
Expand All @@ -1149,7 +1237,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro
return nil
}

func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []TransactionLabel) error {
func (p *PostgreSQLpgx) WriteTransactions(tx pgx.Tx, blockchain string, transactions []TransactionLabel) error {
tableName := LabelsTableName(blockchain)
columns := []string{"id", "address", "block_number", "block_hash", "caller_address", "label_name", "label_type", "origin_address", "label", "transaction_hash", "label_data", "block_timestamp"}

Expand Down Expand Up @@ -1254,7 +1342,7 @@ func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []Tran

ctx := context.Background()

err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash) WHERE label='seer' AND label_type='tx_call' DO NOTHING")
err := p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash) WHERE label='seer' AND label_type='tx_call' DO NOTHING")

if err != nil {
return err
Expand Down
51 changes: 4 additions & 47 deletions indexer/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package indexer

import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -146,51 +145,9 @@ func filterDuplicates(indexType string, existingIndices, newIndices []interface{
return nil
}

func WriteIndexesToDatabase(blockchain string, indexes []interface{}, indexType string) error {
// Implement logic to write indexes to the database
// WriteIndicesToDatabase writes the given indices to the database
func WriteIndicesToDatabase(blockchain string, blocks []BlockIndex, transactions []TransactionIndex, logs []LogIndex) error {
// Write block indices

if len(indexes) == 0 {
return nil
}

switch indexType {
case "block":
var blockIndexes []BlockIndex
for _, i := range indexes {
index, ok := i.(BlockIndex)
if !ok {
return errors.New("invalid type for block index")
}
index.chain = blockchain
blockIndexes = append(blockIndexes, index)
}
return DBConnection.writeBlockIndexToDB(blockchain, blockIndexes)

case "transaction":
var transactionIndexes []TransactionIndex
for _, i := range indexes {
index, ok := i.(TransactionIndex)
if !ok {
return errors.New("invalid type for transaction index")
}
index.chain = blockchain
transactionIndexes = append(transactionIndexes, index)
}
return DBConnection.writeTransactionIndexToDB(blockchain+"_transactions", transactionIndexes)

case "log":
var logIndexes []LogIndex
for _, i := range indexes {
index, ok := i.(LogIndex)
if !ok {
return errors.New("invalid type for log index")
}
index.chain = blockchain
logIndexes = append(logIndexes, index)
}
return DBConnection.writeLogIndexToDB(blockchain+"_logs", logIndexes)

default:
return errors.New("unsupported index type")
}
return DBConnection.WriteIndexes(blockchain, blocks, transactions, logs)
}
18 changes: 1 addition & 17 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,23 +399,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
decodedTransactionsPack = append(decodedTransactionsPack, decodedTransactions...)
}

if len(decodedEventsPack) > 0 {
// Write events to user RDS
customer.Pgx.WriteEvents(
d.blockchain,
decodedEventsPack,
)
}

// // Transactions

// Write transactions to user RDS
if len(decodedTransactionsPack) > 0 {
customer.Pgx.WriteTransactions(
d.blockchain,
decodedTransactionsPack,
)
}
customer.Pgx.WriteLabes(d.blockchain, decodedTransactionsPack, decodedEventsPack)

<-sem
}(update)
Expand Down
Loading