From d85c003acd7f9d2ca9494acd12bd5229bf65f2fe Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 16 Jul 2024 22:55:50 +0300 Subject: [PATCH 1/2] Batch commit. --- crawler/crawler.go | 20 +++---- indexer/db.go | 111 ++++++++++++++++++++++++++++++----- indexer/upload.go | 51 ++-------------- synchronizer/synchronizer.go | 18 +----- 4 files changed, 108 insertions(+), 92 deletions(-) diff --git a/crawler/crawler.go b/crawler/crawler.go index b318567..89cc0a5 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -126,35 +126,29 @@ func (c *Crawler) PushPackOfData(blocksBufferPack *bytes.Buffer, blocksIndexPack log.Printf("Saved .proto blocks with transactions and events to %s", packRange) // Save indexes data - var interfaceBlocksIndexPack []interface{} + var interfaceBlocksIndexPack []indexer.BlockIndex for _, v := range blocksIndexPack { v.Path = filepath.Join(c.basePath, packRange, "data.proto") interfaceBlocksIndexPack = append(interfaceBlocksIndexPack, v) } - var interfaceTxsIndexPack []interface{} + var interfaceTxsIndexPack []indexer.TransactionIndex for _, v := range txsIndexPack { v.Path = filepath.Join(c.basePath, packRange, "data.proto") interfaceTxsIndexPack = append(interfaceTxsIndexPack, v) } - var interfaceEventsIndexPack []interface{} + var interfaceEventsIndexPack []indexer.LogIndex for _, v := range eventsIndexPack { v.Path = filepath.Join(c.basePath, packRange, "data.proto") interfaceEventsIndexPack = append(interfaceEventsIndexPack, v) } - // TODO: Unite in one commit - if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceBlocksIndexPack, "block"); err != nil { - return fmt.Errorf("failed to write block index to database: %w", err) - } - - if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceTxsIndexPack, "transaction"); err != nil { - return fmt.Errorf("failed to write transaction index to database: %w", err) - } + // Write indexes to database + err := indexer.WriteIndicesToDatabase(c.blockchain, interfaceBlocksIndexPack, interfaceTxsIndexPack, interfaceEventsIndexPack) - if err := indexer.WriteIndexesToDatabase(c.blockchain, interfaceEventsIndexPack, "log"); err != nil { - return fmt.Errorf("failed to write event index to database: %w", err) + if err != nil { + return fmt.Errorf("failed to write indices to database: %w", err) } return nil diff --git a/indexer/db.go b/indexer/db.go index 3630808..5cd8b1b 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -289,8 +289,9 @@ func updateValues(valuesMap map[string]UnnestInsertValueStruct, key string, valu valuesMap[key] = tmp } -// Batch insert -func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string, columns []string, values map[string]UnnestInsertValueStruct, conflictClause string) error { +func (p *PostgreSQLpgx) WriteIndexes(blockchain string, blocksIndexPack []BlockIndex, transactionsIndexPack []TransactionIndex, logsIndexPack []LogIndex) error { + + ctx := context.Background() pool := p.GetPool() conn, err := pool.Acquire(ctx) if err != nil { @@ -314,6 +315,36 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string } }() + // Write blocks index + if len(blocksIndexPack) > 0 { + err = p.writeBlockIndexToDB(tx, blockchain, blocksIndexPack) + if err != nil { + return err + } + } + + // Write transactions index + if len(transactionsIndexPack) > 0 { + err = p.writeTransactionIndexToDB(tx, blockchain, transactionsIndexPack) + if err != nil { + return err + } + } + + // Write logs index + if len(logsIndexPack) > 0 { + err = p.writeLogIndexToDB(tx, blockchain, logsIndexPack) + if err != nil { + return err + } + } + + return nil +} + +// Batch insert +func (p *PostgreSQLpgx) executeBatchInsert(tx pgx.Tx, ctx context.Context, tableName string, columns []string, values map[string]UnnestInsertValueStruct, conflictClause string) error { + types := make([]string, 0) for index, column := range columns { @@ -339,7 +370,7 @@ func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string return nil } -func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIndex) error { +func (p *PostgreSQLpgx) writeBlockIndexToDB(tx pgx.Tx, blockchain string, indexes []BlockIndex) error { tableName := blockchain + "_blocks" isBlockchainWithL1Chain := IsBlockchainWithL1Chain(blockchain) columns := []string{"block_number", "block_hash", "block_timestamp", "parent_hash", "row_id", "path"} @@ -399,18 +430,18 @@ func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIn } ctx := context.Background() - err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (block_number) DO NOTHING") + err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (block_number) DO NOTHING") if err != nil { return err } - log.Printf("Saved %d records into %s table", len(indexes), tableName) + log.Printf("Add %d records to transaction into %s table", len(indexes), tableName) return nil } -func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []TransactionIndex) error { +func (p *PostgreSQLpgx) writeTransactionIndexToDB(tx pgx.Tx, tableName string, indexes []TransactionIndex) error { columns := []string{"block_number", "block_hash", "hash", "index", "type", "from_address", "to_address", "selector", "row_id", "path"} var valuesMap = make(map[string]UnnestInsertValueStruct) @@ -495,18 +526,18 @@ func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []Tr ctx := context.Background() - err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (hash) DO NOTHING") + err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (hash) DO NOTHING") if err != nil { return err } - log.Printf("Saved %d records into %s table", len(indexes), tableName) + log.Printf("Add %d records to transaction into %s table", len(indexes), tableName) return nil } -func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex) error { +func (p *PostgreSQLpgx) writeLogIndexToDB(tx pgx.Tx, tableName string, indexes []LogIndex) error { columns := []string{"transaction_hash", "block_hash", "address", "selector", "topic1", "topic2", "topic3", "row_id", "log_index", "path"} @@ -584,13 +615,13 @@ func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex) } ctx := context.Background() - err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING") + err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING") if err != nil { return err } - log.Printf("Saved %d records into %s table", len(indexes), tableName) + log.Printf("Add %d records to transaction into %s table", len(indexes), tableName) return nil } @@ -1027,7 +1058,57 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock } -func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) error { +func (p *PostgreSQLpgx) WriteLabes( + blockchain string, + transactions []TransactionLabel, + events []EventLabel, +) error { + + pool := p.GetPool() + + conn, err := pool.Acquire(context.Background()) + + if err != nil { + return err + } + + defer conn.Release() + + tx, err := conn.Begin(context.Background()) + + if err != nil { + return err + } + + defer func() { + if err := recover(); err != nil { + tx.Rollback(context.Background()) + panic(err) + } else if err != nil { + tx.Rollback(context.Background()) + } else { + err = tx.Commit(context.Background()) + } + }() + + if len(transactions) > 0 { + err := p.WriteTransactions(tx, blockchain, transactions) + if err != nil { + return err + } + } + + if len(events) > 0 { + err := p.WriteEvents(tx, blockchain, events) + if err != nil { + return err + } + } + + return nil +} + +func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error { tableName := LabelsTableName(blockchain) columns := []string{"id", "label", "transaction_hash", "log_index", "block_number", "block_hash", "block_timestamp", "caller_address", "origin_address", "address", "label_name", "label_type", "label_data"} @@ -1138,7 +1219,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro ctx := context.Background() - err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING") + err := p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING") if err != nil { return err @@ -1149,7 +1230,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro return nil } -func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []TransactionLabel) error { +func (p *PostgreSQLpgx) WriteTransactions(tx pgx.Tx, blockchain string, transactions []TransactionLabel) error { tableName := LabelsTableName(blockchain) columns := []string{"id", "address", "block_number", "block_hash", "caller_address", "label_name", "label_type", "origin_address", "label", "transaction_hash", "label_data", "block_timestamp"} @@ -1254,7 +1335,7 @@ func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []Tran ctx := context.Background() - err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash) WHERE label='seer' AND label_type='tx_call' DO NOTHING") + err := p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash) WHERE label='seer' AND label_type='tx_call' DO NOTHING") if err != nil { return err diff --git a/indexer/upload.go b/indexer/upload.go index 8ab38f4..0c2bf12 100644 --- a/indexer/upload.go +++ b/indexer/upload.go @@ -2,7 +2,6 @@ package indexer import ( "encoding/json" - "errors" "fmt" "io" "os" @@ -146,51 +145,9 @@ func filterDuplicates(indexType string, existingIndices, newIndices []interface{ return nil } -func WriteIndexesToDatabase(blockchain string, indexes []interface{}, indexType string) error { - // Implement logic to write indexes to the database +// WriteIndicesToDatabase writes the given indices to the database +func WriteIndicesToDatabase(blockchain string, blocks []BlockIndex, transactions []TransactionIndex, logs []LogIndex) error { + // Write block indices - if len(indexes) == 0 { - return nil - } - - switch indexType { - case "block": - var blockIndexes []BlockIndex - for _, i := range indexes { - index, ok := i.(BlockIndex) - if !ok { - return errors.New("invalid type for block index") - } - index.chain = blockchain - blockIndexes = append(blockIndexes, index) - } - return DBConnection.writeBlockIndexToDB(blockchain, blockIndexes) - - case "transaction": - var transactionIndexes []TransactionIndex - for _, i := range indexes { - index, ok := i.(TransactionIndex) - if !ok { - return errors.New("invalid type for transaction index") - } - index.chain = blockchain - transactionIndexes = append(transactionIndexes, index) - } - return DBConnection.writeTransactionIndexToDB(blockchain+"_transactions", transactionIndexes) - - case "log": - var logIndexes []LogIndex - for _, i := range indexes { - index, ok := i.(LogIndex) - if !ok { - return errors.New("invalid type for log index") - } - index.chain = blockchain - logIndexes = append(logIndexes, index) - } - return DBConnection.writeLogIndexToDB(blockchain+"_logs", logIndexes) - - default: - return errors.New("unsupported index type") - } + return DBConnection.WriteIndexes(blockchain, blocks, transactions, logs) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 84ae920..cf5e280 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -399,23 +399,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { decodedTransactionsPack = append(decodedTransactionsPack, decodedTransactions...) } - if len(decodedEventsPack) > 0 { - // Write events to user RDS - customer.Pgx.WriteEvents( - d.blockchain, - decodedEventsPack, - ) - } - - // // Transactions - - // Write transactions to user RDS - if len(decodedTransactionsPack) > 0 { - customer.Pgx.WriteTransactions( - d.blockchain, - decodedTransactionsPack, - ) - } + customer.Pgx.WriteLabes(d.blockchain, decodedTransactionsPack, decodedEventsPack) <-sem }(update) From 75b2b1706dfda021794dde02d9a05957d1489c68 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 17 Jul 2024 13:58:58 +0300 Subject: [PATCH 2/2] Fix names inserts. --- indexer/db.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index 5cd8b1b..a842834 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -362,6 +362,8 @@ func (p *PostgreSQLpgx) executeBatchInsert(tx pgx.Tx, ctx context.Context, table valuesSlice = append(valuesSlice, values[column].Values) } + // track execution time + if _, err := tx.Exec(ctx, query, valuesSlice...); err != nil { fmt.Println("Error executing bulk insert", err) return fmt.Errorf("error executing bulk insert for batch: %w", err) @@ -371,7 +373,7 @@ func (p *PostgreSQLpgx) executeBatchInsert(tx pgx.Tx, ctx context.Context, table } func (p *PostgreSQLpgx) writeBlockIndexToDB(tx pgx.Tx, blockchain string, indexes []BlockIndex) error { - tableName := blockchain + "_blocks" + tableName := BlocksTableName(blockchain) isBlockchainWithL1Chain := IsBlockchainWithL1Chain(blockchain) columns := []string{"block_number", "block_hash", "block_timestamp", "parent_hash", "row_id", "path"} @@ -441,7 +443,9 @@ func (p *PostgreSQLpgx) writeBlockIndexToDB(tx pgx.Tx, blockchain string, indexe return nil } -func (p *PostgreSQLpgx) writeTransactionIndexToDB(tx pgx.Tx, tableName string, indexes []TransactionIndex) error { +func (p *PostgreSQLpgx) writeTransactionIndexToDB(tx pgx.Tx, blockchain string, indexes []TransactionIndex) error { + + tableName := TransactionsTableName(blockchain) columns := []string{"block_number", "block_hash", "hash", "index", "type", "from_address", "to_address", "selector", "row_id", "path"} var valuesMap = make(map[string]UnnestInsertValueStruct) @@ -537,7 +541,9 @@ func (p *PostgreSQLpgx) writeTransactionIndexToDB(tx pgx.Tx, tableName string, i return nil } -func (p *PostgreSQLpgx) writeLogIndexToDB(tx pgx.Tx, tableName string, indexes []LogIndex) error { +func (p *PostgreSQLpgx) writeLogIndexToDB(tx pgx.Tx, blockchain string, indexes []LogIndex) error { + + tableName := LogsTableName(blockchain) columns := []string{"transaction_hash", "block_hash", "address", "selector", "topic1", "topic2", "topic3", "row_id", "log_index", "path"} @@ -615,6 +621,7 @@ func (p *PostgreSQLpgx) writeLogIndexToDB(tx pgx.Tx, tableName string, indexes [ } ctx := context.Background() + err = p.executeBatchInsert(tx, ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING") if err != nil {