From 2317b0c379e4e28767a046668dc437d6a1583f4b Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 10 Jul 2024 16:18:16 +0300 Subject: [PATCH] Add unnest inserting insted of batch insert. --- indexer/db.go | 660 +++++++++++++++++++++-------------- indexer/settings.go | 1 + synchronizer/synchronizer.go | 4 +- 3 files changed, 407 insertions(+), 258 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index 10d946d..63a493c 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -46,6 +46,12 @@ func hexStringToInt(hexString string) (int64, error) { return intValue, nil } +// https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ insted of batching +type UnnestInsertValueStruct struct { + Type string `json:"type"` // e.g. "BIGINT" or "TEXT" or any other PostgreSQL data type + Values []interface{} `json:"values"` // e.g. [1, 2, 3, 4, 5] +} + func IsBlockchainWithL1Chain(blockchain string) bool { switch blockchain { case "ethereum": @@ -269,238 +275,318 @@ func (p *PostgreSQLpgx) ReadLastLabel(blockchain string) (uint64, error) { return label, nil } -func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIndex) error { - tableName := blockchain + "_blocks" - pool := p.GetPool() +func decodeAddress(address string) ([]byte, error) { + if len(address) < 2 { + return []byte{0x00}, nil + } + return hex.DecodeString(address[2:]) +} - conn, err := pool.Acquire(context.Background()) +// updateValues updates the values in the map for a given key +func updateValues(valuesMap map[string]UnnestInsertValueStruct, key string, value interface{}) { + tmp := valuesMap[key] + tmp.Values = append(tmp.Values, value) + valuesMap[key] = tmp +} + +// Batch insert +func (p *PostgreSQLpgx) executeBatchInsert(ctx context.Context, tableName string, columns []string, values map[string]UnnestInsertValueStruct, conflictClause string) error { + pool := p.GetPool() + conn, err := pool.Acquire(ctx) if err != nil { fmt.Println("Connection error", err) return err } defer conn.Release() - // Start building the bulk insert query - var query string - isBlockchainWithL1Chain := IsBlockchainWithL1Chain(blockchain) - if isBlockchainWithL1Chain { - query = fmt.Sprintf("INSERT INTO %s (block_number,block_hash,block_timestamp,parent_hash,row_id,path,l1_block_number) VALUES ", tableName) - } else { - query = fmt.Sprintf("INSERT INTO %s (block_number,block_hash,block_timestamp,parent_hash,row_id,path) VALUES ", tableName) + tx, err := conn.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) } - // Placeholder slice for query parameters - var params []interface{} - - // Loop through indexes to append values and parameters - var indexesLen int - for i, index := range indexes { - if isBlockchainWithL1Chain { - query += fmt.Sprintf("( $%d, $%d, $%d, $%d, $%d, $%d, $%d),", i*7+1, i*7+2, i*7+3, i*7+4, i*7+5, i*7+6, i*7+7) - params = append(params, index.BlockNumber, index.BlockHash, index.BlockTimestamp, index.ParentHash, index.RowID, index.Path, index.L1BlockNumber) + defer func() { + if err := recover(); err != nil { + tx.Rollback(ctx) + panic(err) + } else if err != nil { + tx.Rollback(ctx) } else { - query += fmt.Sprintf("( $%d, $%d, $%d, $%d, $%d, $%d),", i*6+1, i*6+2, i*6+3, i*6+4, i*6+5, i*6+6) - params = append(params, index.BlockNumber, index.BlockHash, index.BlockTimestamp, index.ParentHash, index.RowID, index.Path) + err = tx.Commit(ctx) } - indexesLen++ + }() + + types := make([]string, 0) + + for index, column := range columns { + // constract unnest($1::int[], $2::int[] ...) + types = append(types, fmt.Sprintf("$%d::%s[]", index+1, values[column].Type)) } - // Remove the last comma from the query - query = query[:len(query)-1] + query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM unnest(%s) %s", tableName, strings.Join(columns, ","), strings.Join(types, ","), conflictClause) - // Add the ON CONFLICT clause - adjust based on your conflict resolution strategy - // For example, to do nothing on conflict with the 'id' column - query += " ON CONFLICT (block_number) DO NOTHING" + // create a slices of values + var valuesSlice []interface{} - // Execute the query - _, err = conn.Exec(context.Background(), query, params...) - if err != nil { + for _, column := range columns { + + valuesSlice = append(valuesSlice, values[column].Values) + } + + if _, err := tx.Exec(ctx, query, valuesSlice...); err != nil { fmt.Println("Error executing bulk insert", err) - return err + return fmt.Errorf("error executing bulk insert for batch: %w", err) } - log.Printf("Saved %d records into %s table", indexesLen, tableName) return nil } -func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []TransactionIndex) error { +func (p *PostgreSQLpgx) writeBlockIndexToDB(blockchain string, indexes []BlockIndex) error { + tableName := blockchain + "_blocks" + isBlockchainWithL1Chain := IsBlockchainWithL1Chain(blockchain) + columns := []string{"block_number", "block_hash", "block_timestamp", "parent_hash", "row_id", "path"} - pool := p.GetPool() + valuesMap := make(map[string]UnnestInsertValueStruct) - conn, err := pool.Acquire(context.Background()) + valuesMap["block_number"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: []interface{}{}, + } - if err != nil { + valuesMap["block_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - return err + valuesMap["block_timestamp"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), } - defer conn.Release() + valuesMap["parent_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Start building the bulk insert query - query := fmt.Sprintf("INSERT INTO %s (block_number, block_hash, hash, index, type, from_address, to_address, selector, row_id, path) VALUES ", tableName) + valuesMap["row_id"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } - // Placeholder slice for query parameters - var params []interface{} + valuesMap["path"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Loop through indexes to append values and parameters - var toAddressBytes, fromAddressBytes []byte + if isBlockchainWithL1Chain { + columns = append(columns, "l1_block_number") + valuesMap["l1_block_number"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } + } - var indexesLen int - for i, index := range indexes { - query += fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d),", i*10+1, i*10+2, i*10+3, i*10+4, i*10+5, i*10+6, i*10+7, i*10+8, i*10+9, i*10+10) + for _, index := range indexes { - // Decode to_address - if len(index.ToAddress) < 2 { - // /x00 is the null byte - toAddressBytes = []byte{0x00} - } else { - toAddressBytes, err = hex.DecodeString(index.ToAddress[2:]) // Remove the '0x' prefix before conversion - if err != nil { - fmt.Println("Error decoding to_address:", err, index) - continue - } - } + updateValues(valuesMap, "block_number", index.BlockNumber) + updateValues(valuesMap, "block_hash", index.BlockHash) + updateValues(valuesMap, "block_timestamp", index.BlockTimestamp) + updateValues(valuesMap, "parent_hash", index.ParentHash) + updateValues(valuesMap, "row_id", index.RowID) + updateValues(valuesMap, "path", index.Path) - // Decode from_address - if len(index.FromAddress) < 2 { - fromAddressBytes = []byte{0x00} - } else { - fromAddressBytes, err = hex.DecodeString(index.FromAddress[2:]) - if err != nil { - fmt.Println("Error decoding from_address:", err, index) - continue - } + if isBlockchainWithL1Chain { + updateValues(valuesMap, "l1_block_number", index.L1BlockNumber) } - - // Append the parameters for this record - params = append(params, index.BlockNumber, index.BlockHash, index.TransactionHash, index.TransactionIndex, index.Type, fromAddressBytes, toAddressBytes, index.Selector, index.RowID, index.Path) - indexesLen++ } - // Remove the last comma from the query - query = query[:len(query)-1] + ctx := context.Background() + err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (block_number) DO NOTHING") - // Add the ON CONFLICT clause - adjust based on your conflict resolution strategy - - query += " ON CONFLICT (hash) DO NOTHING" + if err != nil { + return err + } - // Execute the query + log.Printf("Saved %d records into %s table", len(indexes), tableName) - _, err = conn.Exec(context.Background(), query, params...) + return nil +} - if err != nil { +func (p *PostgreSQLpgx) writeTransactionIndexToDB(tableName string, indexes []TransactionIndex) error { - fmt.Println("Error executing bulk insert", err) + columns := []string{"block_number", "block_hash", "hash", "index", "type", "from_address", "to_address", "selector", "row_id", "path"} + var valuesMap = make(map[string]UnnestInsertValueStruct) - return err + valuesMap["block_number"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } + valuesMap["block_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), } - log.Printf("Saved %d records into %s table", indexesLen, tableName) + valuesMap["hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - return nil + valuesMap["index"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } -} + valuesMap["type"] = UnnestInsertValueStruct{ + Type: "INT", + Values: make([]interface{}, 0), + } -func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex) error { + valuesMap["from_address"] = UnnestInsertValueStruct{ + Type: "BYTEA", + Values: make([]interface{}, 0), + } - pool := p.GetPool() + valuesMap["to_address"] = UnnestInsertValueStruct{ + Type: "BYTEA", + Values: make([]interface{}, 0), + } - // Create a context with a timeout - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + valuesMap["selector"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - conn, err := pool.Acquire(ctx) - if err != nil { - fmt.Println("Connection error", err) - return err + valuesMap["row_id"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), } - defer conn.Release() - // Start a transaction - tx, err := conn.Begin(ctx) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + valuesMap["path"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), } - // Ensure the transaction is either committed or rolled back - defer func() { - if err := recover(); err != nil { - tx.Rollback(ctx) - panic(err) // re-throw panic after Rollback - } else if err != nil { - tx.Rollback(ctx) // err is non-nil; don't change it - } else { - err = tx.Commit(ctx) // err is nil; if Commit returns error update err - } - }() + for _, index := range indexes { - // Define the batch size + fromAddressBytes, err := decodeAddress(index.FromAddress) + if err != nil { + fmt.Println("Error decoding from address:", err, index) + continue + } - var addressBytes []byte + toAddressBytes, err := decodeAddress(index.ToAddress) - for i := 0; i < len(indexes); i += InsertBatchSize { - // Determine the end of the current batch - end := i + InsertBatchSize - if end > len(indexes) { - end = len(indexes) + if err != nil { + fmt.Println("Error decoding to address:", err, index) + continue } - // Start building the bulk insert query - query := fmt.Sprintf("INSERT INTO %s (transaction_hash, block_hash, address, selector, topic1, topic2, row_id, log_index, path) VALUES ", tableName) + updateValues(valuesMap, "block_number", index.BlockNumber) + updateValues(valuesMap, "block_hash", index.BlockHash) + updateValues(valuesMap, "hash", index.TransactionHash) + updateValues(valuesMap, "index", index.TransactionIndex) + updateValues(valuesMap, "type", index.Type) + updateValues(valuesMap, "from_address", fromAddressBytes) + updateValues(valuesMap, "to_address", toAddressBytes) + updateValues(valuesMap, "selector", index.Selector) + updateValues(valuesMap, "row_id", index.RowID) + updateValues(valuesMap, "path", index.Path) - // Placeholder slice for query parameters - var params []interface{} + } - // Loop through indexes to append values and parameters + ctx := context.Background() - var indexesLen int - for i, index := range indexes[i:end] { + err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (hash) DO NOTHING") - query += fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d),", i*9+1, i*9+2, i*9+3, i*9+4, i*9+5, i*9+6, i*9+7, i*9+8, i*9+9) + if err != nil { + return err + } - if len(index.Address) < 2 { - // /x00 is the null byte - addressBytes = []byte{0x00} + log.Printf("Saved %d records into %s table", len(indexes), tableName) - } else { + return nil +} - addressBytes, err = hex.DecodeString(index.Address[2:]) // Remove the '0x' prefix before conversion +func (p *PostgreSQLpgx) writeLogIndexToDB(tableName string, indexes []LogIndex) error { - if err != nil { - fmt.Println("Error decoding address", err, index) - continue - } - } + columns := []string{"transaction_hash", "block_hash", "address", "selector", "topic1", "topic2", "row_id", "log_index", "path"} - params = append(params, index.TransactionHash, index.BlockHash, addressBytes, index.Selector, index.Topic1, index.Topic2, index.RowID, index.LogIndex, index.Path) - indexesLen++ - } + var valuesMap = make(map[string]UnnestInsertValueStruct) - query = query[:len(query)-1] + valuesMap["transaction_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Add the ON CONFLICT clause - adjust based on your conflict resolution strategy + valuesMap["block_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - query += " ON CONFLICT (transaction_hash, log_index) DO NOTHING" + valuesMap["address"] = UnnestInsertValueStruct{ + Type: "BYTEA", + Values: make([]interface{}, 0), + } - if _, err := tx.Exec(context.Background(), query, params...); err != nil { - fmt.Println("Error executing bulk insert", err) - return fmt.Errorf("error executing bulk insert for batch: %w", err) - } + valuesMap["selector"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - log.Printf("Saved %d records into %s table", indexesLen, tableName) + valuesMap["topic1"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } + valuesMap["topic2"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), } - if err != nil { + valuesMap["row_id"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } - log.Println("Error writing log index to database", err) + valuesMap["log_index"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } - return err + valuesMap["path"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } + + for _, index := range indexes { + + toAddressBytes, err := decodeAddress(index.Address) + if err != nil { + fmt.Println("Error decoding address:", err, index) + continue + } + + updateValues(valuesMap, "transaction_hash", index.TransactionHash) + updateValues(valuesMap, "block_hash", index.BlockHash) + updateValues(valuesMap, "address", toAddressBytes) + updateValues(valuesMap, "selector", index.Selector) + updateValues(valuesMap, "topic1", index.Topic1) + updateValues(valuesMap, "topic2", index.Topic2) + updateValues(valuesMap, "row_id", index.RowID) + updateValues(valuesMap, "log_index", index.LogIndex) + updateValues(valuesMap, "path", index.Path) } - return nil + ctx := context.Background() + err = p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) DO NOTHING") + + if err != nil { + return err + } + log.Printf("Saved %d records into %s table", len(indexes), tableName) + + return nil } // GetEdgeDBBlock fetch first or last block for specified blockchain @@ -708,7 +794,9 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock customer_id, abi_selector, abi_name, - abi + abi, + (abi)::jsonb ->> 'type' as abi_type, + (abi)::jsonb ->> 'stateMutability' as abi_stateMutability FROM abi_jobs WHERE @@ -756,7 +844,9 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock transactions.transaction_path FROM transactions - inner JOIN jobs ON transactions.transaction_address = jobs.address + inner JOIN jobs ON abi_type = 'function' + AND abi_stateMutability != 'view' + AND transactions.transaction_address = jobs.address AND transactions.transaction_selector = jobs.abi_selector ), abi_events AS ( @@ -774,7 +864,8 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock events.event_path FROM events - inner JOIN jobs ON events.event_address = jobs.address + inner JOIN jobs ON abi_type = 'event' + AND events.event_address = jobs.address AND events.event_selector = jobs.abi_selector ), combined AS ( @@ -934,145 +1025,202 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock } func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) error { - pool := p.GetPool() + tableName := LabelsTableName(blockchain) + columns := []string{"id", "label", "transaction_hash", "log_index", "block_number", "block_hash", "block_timestamp", "caller_address", "origin_address", "address", "label_name", "label_type", "label_data"} + var valuesMap = make(map[string]UnnestInsertValueStruct) - // Create a context with a timeout - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + valuesMap["id"] = UnnestInsertValueStruct{ + Type: "UUID", + Values: make([]interface{}, 0), + } - conn, err := pool.Acquire(ctx) - if err != nil { - fmt.Println("Connection error", err) - return err + valuesMap["label"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), } - defer conn.Release() - // Start a transaction - tx, err := conn.Begin(ctx) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + valuesMap["transaction_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), } - // Ensure the transaction is either committed or rolled back - defer func() { - if err := recover(); err != nil { - tx.Rollback(ctx) - panic(err) // re-throw panic after Rollback - } else if err != nil { - tx.Rollback(ctx) // err is non-nil; don't change it - } else { - err = tx.Commit(ctx) // err is nil; if Commit returns error update err - } - }() + valuesMap["log_index"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } - tableName := LabelsTableName(blockchain) + valuesMap["block_number"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } - // Too many parameters error - // Batch insert events calculated as parameters_amount_per_row*batch_size <= 65535 (max number of parameters in a single query) - for i := 0; i < len(events); i += InsertBatchSize { - // Determine the end of the current batch - end := i + InsertBatchSize - if end > len(events) { - end = len(events) - } + valuesMap["block_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } + + valuesMap["block_timestamp"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } - // Start building the bulk insert query - query := fmt.Sprintf("INSERT INTO %s (id, label, transaction_hash, log_index, block_number, block_hash, block_timestamp, caller_address, origin_address, address, label_name, label_type, label_data) VALUES ", tableName) + valuesMap["caller_address"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - var params []interface{} + valuesMap["origin_address"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Loop through labels to append values and parameters - for j, label := range events[i:end] { - id := uuid.New() - query += fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d),", - j*13+1, j*13+2, j*13+3, j*13+4, j*13+5, j*13+6, j*13+7, j*13+8, j*13+9, j*13+10, j*13+11, j*13+12, j*13+13) - params = append(params, id, label.Label, label.TransactionHash, label.LogIndex, label.BlockNumber, label.BlockHash, label.BlockTimestamp, label.CallerAddress, label.OriginAddress, label.Address, label.LabelName, label.LabelType, label.LabelData) - } + valuesMap["address"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Remove the last comma - query = query[:len(query)-1] + valuesMap["label_name"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // ON CONFLICT clause - skip duplicates - query += " ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING" + valuesMap["label_type"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - if _, err := tx.Exec(ctx, query, params...); err != nil { - fmt.Println("Error executing bulk insert", err) - return fmt.Errorf("error executing bulk insert for batch: %w", err) - } + valuesMap["label_data"] = UnnestInsertValueStruct{ + Type: "jsonb", + Values: make([]interface{}, 0), } - log.Printf("Pushed %d events into %s", len(events), tableName) + for _, event := range events { + + id := uuid.New() + + updateValues(valuesMap, "id", id) + updateValues(valuesMap, "label", event.Label) + updateValues(valuesMap, "transaction_hash", event.TransactionHash) + updateValues(valuesMap, "log_index", event.LogIndex) + updateValues(valuesMap, "block_number", event.BlockNumber) + updateValues(valuesMap, "block_hash", event.BlockHash) + updateValues(valuesMap, "block_timestamp", event.BlockTimestamp) + updateValues(valuesMap, "caller_address", event.CallerAddress) + updateValues(valuesMap, "origin_address", event.OriginAddress) + updateValues(valuesMap, "address", event.Address) + updateValues(valuesMap, "label_name", event.LabelName) + updateValues(valuesMap, "label_type", event.LabelType) + updateValues(valuesMap, "label_data", event.LabelData) + + } + + ctx := context.Background() + + err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash, log_index) where label='seer' and label_type = 'event' DO NOTHING") + + if err != nil { + return err + } + + log.Printf("Saved %d events records into %s table", len(events), tableName) return nil } func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []TransactionLabel) error { - pool := p.GetPool() + tableName := LabelsTableName(blockchain) + columns := []string{"id", "address", "block_number", "block_hash", "caller_address", "label_name", "label_type", "origin_address", "label", "transaction_hash", "label_data", "block_timestamp"} - // Create a context with a timeout - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + var valuesMap = make(map[string]UnnestInsertValueStruct) - conn, err := pool.Acquire(context.Background()) - if err != nil { - return err + valuesMap["id"] = UnnestInsertValueStruct{ + Type: "UUID", + Values: make([]interface{}, 0), } - defer conn.Release() - // Start a transaction - tx, err := conn.Begin(ctx) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + valuesMap["address"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), } - // Ensure the transaction is either committed or rolled back - defer func() { - if err := recover(); err != nil { - tx.Rollback(ctx) - panic(err) // re-throw panic after Rollback - } else if err != nil { - tx.Rollback(ctx) // err is non-nil; don't change it - } else { - err = tx.Commit(ctx) // err is nil; if Commit returns error update err - } - }() + valuesMap["block_number"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } - tableName := LabelsTableName(blockchain) + valuesMap["block_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - query := fmt.Sprintf("INSERT INTO %s (id, address, block_number, block_hash, caller_address, label_name, label_type, origin_address, label, transaction_hash, label_data, block_timestamp) VALUES ", tableName) + valuesMap["caller_address"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Placeholder slice for query parameters - var params []interface{} + valuesMap["label_name"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - for i := 0; i < len(transactions); i += InsertBatchSize { - // Determine the end of the current batch - end := i + InsertBatchSize - if end > len(transactions) { - end = len(transactions) - } + valuesMap["label_type"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Loop through transactions to append values and parameters - for row, label := range transactions[i:end] { - id := uuid.New() - query += fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d),", - row*12+1, row*12+2, row*12+3, row*12+4, row*12+5, row*12+6, row*12+7, row*12+8, row*12+9, row*12+10, row*12+11, row*12+12) - params = append(params, id, label.Address, label.BlockNumber, label.BlockHash, label.CallerAddress, label.LabelName, label.LabelType, label.OriginAddress, label.Label, label.TransactionHash, label.LabelData, label.BlockTimestamp) - } + valuesMap["origin_address"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Remove the last comma from the query - query = query[:len(query)-1] + valuesMap["label"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Add the ON CONFLICT clause - skip duplicates - query += " ON CONFLICT (transaction_hash) where label='seer' and label_type = 'tx_call' DO NOTHING" + valuesMap["transaction_hash"] = UnnestInsertValueStruct{ + Type: "TEXT", + Values: make([]interface{}, 0), + } - // Execute the query - _, err = conn.Exec(context.Background(), query, params...) - if err != nil { - log.Println("Error executing bulk insert", err) - return err - } + valuesMap["label_data"] = UnnestInsertValueStruct{ + Type: "jsonb", + Values: make([]interface{}, 0), } - log.Printf("Pushed %d transactions into %s", len(transactions), tableName) + valuesMap["block_timestamp"] = UnnestInsertValueStruct{ + Type: "BIGINT", + Values: make([]interface{}, 0), + } + + for _, transaction := range transactions { + + id := uuid.New() + + updateValues(valuesMap, "id", id) + updateValues(valuesMap, "address", transaction.Address) + updateValues(valuesMap, "block_number", transaction.BlockNumber) + updateValues(valuesMap, "block_hash", transaction.BlockHash) + updateValues(valuesMap, "caller_address", transaction.CallerAddress) + updateValues(valuesMap, "label_name", transaction.LabelName) + updateValues(valuesMap, "label_type", transaction.LabelType) + updateValues(valuesMap, "origin_address", transaction.OriginAddress) + updateValues(valuesMap, "label", transaction.Label) + updateValues(valuesMap, "transaction_hash", transaction.TransactionHash) + updateValues(valuesMap, "label_data", transaction.LabelData) + updateValues(valuesMap, "block_timestamp", transaction.BlockTimestamp) + + } + + ctx := context.Background() + + err := p.executeBatchInsert(ctx, tableName, columns, valuesMap, "ON CONFLICT (transaction_hash) WHERE label='seer' AND label_type='tx_call' DO NOTHING") + + if err != nil { + return err + } + + log.Printf("Saved %d transactions records into %s table", len(transactions), tableName) + return nil } diff --git a/indexer/settings.go b/indexer/settings.go index f024de8..247655b 100644 --- a/indexer/settings.go +++ b/indexer/settings.go @@ -7,6 +7,7 @@ import ( var ( InsertBatchSize = 1000 // Number of rows to insert in a single batch parameters_amount_per_row * InsertBatchSize <= 65535 + InsertMaxParametersPerBatch = 65535 SeerCrawlerLabel string MOONSTREAM_DB_V3_INDEXES_URI string ) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 7d6cea4..0c02c8e 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -233,6 +233,7 @@ func (d *Synchronizer) Start(customerDbUriFlag string) { func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { var isEnd bool + fmt.Println(customerDbUriFlag) customerDBConnections, customerIds, customersErr := d.getCustomers(customerDbUriFlag) if customersErr != nil { return isEnd, customersErr @@ -241,6 +242,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { if d.startBlock == 0 { var latestCustomerBlocks []uint64 for id, customer := range customerDBConnections { + pool := customer.Pgx.GetPool() conn, err := pool.Acquire(context.Background()) if err != nil { @@ -281,8 +283,6 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { return isEnd, idxLatestErr } - fmt.Println("Indexed latest block:", indexedLatestBlock) - if d.endBlock != 0 && indexedLatestBlock > d.endBlock { indexedLatestBlock = d.endBlock }