From 17c41cb0d9ed95af3e62e2ca7a31b5a522a8b6c2 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Thu, 21 Sep 2023 09:11:28 +0200 Subject: [PATCH] (BIDS-2472) fix bug in sync stats, use tx for writing statistics --- db/bigtable.go | 10 ++-- db/statistics.go | 153 ++++++++++++++++++----------------------------- 2 files changed, 65 insertions(+), 98 deletions(-) diff --git a/db/bigtable.go b/db/bigtable.go index 7a5c7081fe..74d723d3e5 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -832,7 +832,7 @@ func (bigtable *Bigtable) SaveSyncComitteeDuties(duties map[types.Slot]map[types return err } - logger.Infof("exported sync committee duties to bigtable in %v", time.Since(start)) + logger.Infof("exported %v sync committee duties to bigtable in %v", len(muts), time.Since(start)) return nil } @@ -1407,7 +1407,6 @@ func (bigtable *Bigtable) GetValidatorSyncDutiesHistory(validators []uint64, sta ranges := bigtable.getValidatorSlotRanges(vals, SYNC_COMMITTEES_FAMILY, startSlot, endSlot) err := bigtable.tableValidatorsHistory.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool { - keySplit := strings.Split(r.Key(), ":") validator, err := bigtable.validatorKeyToIndex(keySplit[1]) @@ -1421,6 +1420,8 @@ func (bigtable *Bigtable) GetValidatorSyncDutiesHistory(validators []uint64, sta return false } slot = MAX_CL_BLOCK_NUMBER - slot + + logger.Info(slot) for _, ri := range r[SYNC_COMMITTEES_FAMILY] { inclusionSlot := MAX_CL_BLOCK_NUMBER - uint64(ri.Timestamp)/1000 @@ -1501,7 +1502,8 @@ func (bigtable *Bigtable) GetValidatorMissedAttestationsCount(validators []uint6 } func (bigtable *Bigtable) GetValidatorSyncDutiesStatistics(validators []uint64, startEpoch uint64, endEpoch uint64) (map[uint64]*types.ValidatorSyncDutiesStatistic, error) { - data, err := bigtable.GetValidatorSyncDutiesHistory(validators, startEpoch, endEpoch) + + data, err := bigtable.GetValidatorSyncDutiesHistory(validators, startEpoch*utils.Config.Chain.Config.SlotsPerEpoch, ((endEpoch+1)*utils.Config.Chain.Config.SlotsPerEpoch)-1) if err != nil { return nil, err @@ -2206,7 +2208,7 @@ func (bigtable *Bigtable) MigrateIncomeDataV1V2Schema(epoch uint64) error { filter := gcp_bigtable.ChainFilters(gcp_bigtable.FamilyFilter(INCOME_DETAILS_COLUMN_FAMILY), gcp_bigtable.LatestNFilter(1)) ctx := context.Background() - prefixEpochRange := gcp_bigtable.PrefixRange(fmt.Sprintf("%s:e:b:%s", bigtable.chainId, fmt.Sprintf("%09d", (MAX_EPOCH+1)-epoch))) + prefixEpochRange := gcp_bigtable.PrefixRange(fmt.Sprintf("%s:e:b:%s", bigtable.chainId, fmt.Sprintf("%09d", (MAX_EPOCH)-epoch))) err := bigtable.tableBeaconchain.ReadRows(ctx, prefixEpochRange, func(r gcp_bigtable.Row) bool { // logger.Infof("processing row %v", r.Key()) diff --git a/db/statistics.go b/db/statistics.go index 1f487935f5..ce863d413e 100644 --- a/db/statistics.go +++ b/db/statistics.go @@ -14,6 +14,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/shopspring/decimal" "github.com/sirupsen/logrus" @@ -51,7 +52,13 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr } exported := Exported{} - err := ReaderDb.Get(&exported, ` + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + + err = tx.Get(&exported, ` SELECT status, failed_attestations_exported, @@ -90,77 +97,77 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr if exported.FailedAttestations { logger.Infof("Skipping failed attestations") - } else if err := WriteValidatorFailedAttestationsStatisticsForDay(validators, day, concurrencyFailedAttestations); err != nil { + } else if err := WriteValidatorFailedAttestationsStatisticsForDay(validators, day, concurrencyFailedAttestations, tx); err != nil { return fmt.Errorf("error in WriteValidatorFailedAttestationsStatisticsForDay: %w", err) } if exported.SyncDuties { logger.Infof("Skipping sync duties") - } else if err := WriteValidatorSyncDutiesForDay(validators, day); err != nil { + } else if err := WriteValidatorSyncDutiesForDay(validators, day, tx); err != nil { return fmt.Errorf("error in WriteValidatorSyncDutiesForDay: %w", err) } if exported.WithdrawalsDeposits { logger.Infof("Skipping withdrawals / deposits") - } else if err := WriteValidatorDepositWithdrawals(day); err != nil { + } else if err := WriteValidatorDepositWithdrawals(day, tx); err != nil { return fmt.Errorf("error in WriteValidatorDepositWithdrawals: %w", err) } if exported.BlockStats { logger.Infof("Skipping block stats") - } else if err := WriteValidatorBlockStats(day); err != nil { + } else if err := WriteValidatorBlockStats(day, tx); err != nil { return fmt.Errorf("error in WriteValidatorBlockStats: %w", err) } if exported.Balance { logger.Infof("Skipping balances") - } else if err := WriteValidatorBalances(validators, day); err != nil { + } else if err := WriteValidatorBalances(validators, day, tx); err != nil { return fmt.Errorf("error in WriteValidatorBalances: %w", err) } if exported.ClRewards { logger.Infof("Skipping cl rewards") - } else if err := WriteValidatorClIcome(validators, day, concurrencyCl); err != nil { + } else if err := WriteValidatorClIcome(validators, day, concurrencyCl, tx); err != nil { return fmt.Errorf("error in WriteValidatorClIcome: %w", err) } if exported.ElRewards { logger.Infof("Skipping el rewards") - } else if err := WriteValidatorElIcome(day); err != nil { + } else if err := WriteValidatorElIcome(day, tx); err != nil { return fmt.Errorf("error in WriteValidatorElIcome: %w", err) } if exported.TotalAccumulation { logger.Infof("Skipping total accumulation") - } else if err := WriteValidatorTotalAccumulation(day, concurrencyTotal); err != nil { + } else if err := WriteValidatorTotalAccumulation(day, concurrencyTotal, tx); err != nil { return fmt.Errorf("error in WriteValidatorTotalAccumulation: %w", err) } if exported.TotalPerformance { logger.Infof("Skipping total performance") - } else if err := WriteValidatorTotalPerformance(day, concurrencyTotal); err != nil { + } else if err := WriteValidatorTotalPerformance(day, concurrencyTotal, tx); err != nil { return fmt.Errorf("error in WriteValidatorTotalPerformance: %w", err) } - if err := WriteValidatorStatsExported(day); err != nil { + if err := WriteValidatorStatsExported(day, tx); err != nil { return fmt.Errorf("error in WriteValidatorStatsExported: %w", err) } + err = tx.Commit() + + if err != nil { + return fmt.Errorf("error committing tx: %w", err) + } logger.Infof("statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorStatsExported(day uint64) error { - tx, err := WriterDb.Beginx() - if err != nil { - return err - } - defer tx.Rollback() +func WriteValidatorStatsExported(day uint64, tx *sqlx.Tx) error { start := time.Now() logger.Infof("marking day export as completed in the validator_stats_status table for day %v", day) - _, err = tx.Exec(` + _, err := tx.Exec(` UPDATE validator_stats_status SET status = true WHERE day=$1 @@ -179,14 +186,10 @@ func WriteValidatorStatsExported(day uint64) error { } logger.Infof("marking completed, took %v", time.Since(start)) - err = tx.Commit() - if err != nil { - return err - } return nil } -func WriteValidatorTotalAccumulation(day uint64, concurrency uint64) error { +func WriteValidatorTotalAccumulation(day uint64, concurrency uint64, tx *sqlx.Tx) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() exportStart := time.Now() @@ -208,7 +211,7 @@ func WriteValidatorTotalAccumulation(day uint64, concurrency uint64) error { CurrentFailedAttestations bool `db:"cur_failed_attestations_exported"` } exported := Exported{} - err := ReaderDb.Get(&exported, ` + err := tx.Get(&exported, ` SELECT last.total_accumulation_exported as last_total_accumulation_exported, cur.cl_rewards_exported as cur_cl_rewards_exported, @@ -251,7 +254,7 @@ func WriteValidatorTotalAccumulation(day uint64, concurrency uint64) error { return gCtx.Err() default: } - _, err = WriterDb.Exec(`INSERT INTO validator_stats ( + _, err = tx.Exec(`INSERT INTO validator_stats ( validatorindex, day, @@ -300,7 +303,7 @@ func WriteValidatorTotalAccumulation(day uint64, concurrency uint64) error { } logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "total_accumulation_exported"); err != nil { + if err = markColumnExported(day, "total_accumulation_exported", tx); err != nil { return err } @@ -308,7 +311,7 @@ func WriteValidatorTotalAccumulation(day uint64, concurrency uint64) error { return nil } -func WriteValidatorTotalPerformance(day uint64, concurrency uint64) error { +func WriteValidatorTotalPerformance(day uint64, concurrency uint64, tx *sqlx.Tx) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() exportStart := time.Now() @@ -327,7 +330,7 @@ func WriteValidatorTotalPerformance(day uint64, concurrency uint64) error { CurrentTotalAccumulation bool `db:"cur_total_accumulation_exported"` } exported := Exported{} - err := ReaderDb.Get(&exported, ` + err := tx.Get(&exported, ` SELECT last.total_performance_exported as last_total_performance_exported, cur.total_accumulation_exported as cur_total_accumulation_exported @@ -368,7 +371,7 @@ func WriteValidatorTotalPerformance(day uint64, concurrency uint64) error { default: } - _, err = WriterDb.Exec(`insert into validator_performance ( + _, err = tx.Exec(`insert into validator_performance ( validatorindex, balance, rank7d, @@ -460,7 +463,7 @@ func WriteValidatorTotalPerformance(day uint64, concurrency uint64) error { start = time.Now() logger.Infof("populate validator_performance rank7d") - _, err = WriterDb.Exec(` + _, err = tx.Exec(` WITH ranked_performance AS ( SELECT validatorindex, @@ -478,7 +481,7 @@ func WriteValidatorTotalPerformance(day uint64, concurrency uint64) error { logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "total_performance_exported"); err != nil { + if err = markColumnExported(day, "total_performance_exported", tx); err != nil { return err } @@ -486,7 +489,7 @@ func WriteValidatorTotalPerformance(day uint64, concurrency uint64) error { return nil } -func WriteValidatorBlockStats(day uint64) error { +func WriteValidatorBlockStats(day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_block_stats").Observe(time.Since(exportStart).Seconds()) @@ -498,16 +501,10 @@ func WriteValidatorBlockStats(day uint64) error { firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) - tx, err := WriterDb.Beginx() - if err != nil { - return err - } - defer tx.Rollback() - start := time.Now() logger.Infof("exporting proposed_blocks, missed_blocks and orphaned_blocks statistics") - _, err = tx.Exec(` + _, err := tx.Exec(` insert into validator_stats (validatorindex, day, proposed_blocks, missed_blocks, orphaned_blocks) ( select proposer, $3, sum(case when status = '1' then 1 else 0 end), sum(case when status = '2' then 1 else 0 end), sum(case when status = '3' then 1 else 0 end) @@ -537,13 +534,9 @@ func WriteValidatorBlockStats(day uint64) error { if err != nil { return fmt.Errorf("error inserting slashings into validator_stats for day [%v], firstEpoch [%v] and lastEpoch [%v]: %w", day, firstEpoch, lastEpoch, err) } - - if err = tx.Commit(); err != nil { - return err - } logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "block_stats_exported"); err != nil { + if err = markColumnExported(day, "block_stats_exported", tx); err != nil { return err } @@ -551,7 +544,7 @@ func WriteValidatorBlockStats(day uint64) error { return nil } -func WriteValidatorElIcome(day uint64) error { +func WriteValidatorElIcome(day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_el_income_stats").Observe(time.Since(exportStart).Seconds()) @@ -563,12 +556,6 @@ func WriteValidatorElIcome(day uint64) error { firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) - tx, err := WriterDb.Beginx() - if err != nil { - return err - } - defer tx.Rollback() - start := time.Now() logger.Infof("exporting mev & el rewards") @@ -584,7 +571,7 @@ func WriteValidatorElIcome(day uint64) error { blocks := make([]*Container, 0) blocksMap := make(map[uint64]*Container) - err = tx.Select(&blocks, "SELECT slot, exec_block_number, proposer FROM blocks WHERE epoch >= $1 AND epoch <= $2 AND exec_block_number > 0 AND status = '1'", firstEpoch, lastEpoch) + err := tx.Select(&blocks, "SELECT slot, exec_block_number, proposer FROM blocks WHERE epoch >= $1 AND epoch <= $2 AND exec_block_number > 0 AND status = '1'", firstEpoch, lastEpoch) if err != nil { return fmt.Errorf("error retrieving blocks data for firstEpoch [%v] and lastEpoch [%v]: %w", firstEpoch, lastEpoch, err) } @@ -656,12 +643,9 @@ func WriteValidatorElIcome(day uint64) error { } } - if err = tx.Commit(); err != nil { - return err - } logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "el_rewards_exported"); err != nil { + if err = markColumnExported(day, "el_rewards_exported", tx); err != nil { return err } @@ -669,7 +653,7 @@ func WriteValidatorElIcome(day uint64) error { return nil } -func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64) error { +func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64, tx *sqlx.Tx) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() exportStart := time.Now() @@ -689,7 +673,7 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64) CurrentWithdrawalsDepositsExported bool `db:"cur_withdrawals_deposits_exported"` } exported := Exported{} - err := ReaderDb.Get(&exported, ` + err := tx.Get(&exported, ` SELECT last.balance_exported as last_balance_exported, cur.balance_exported as cur_balance_exported, cur.withdrawals_deposits_exported as cur_withdrawals_deposits_exported FROM validator_stats_status cur INNER JOIN validator_stats_status last @@ -759,7 +743,7 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64) ON CONFLICT (validatorindex, day) DO UPDATE SET cl_rewards_gwei = excluded.cl_rewards_gwei;` } - _, err = WriterDb.Exec(stmt, day, start, end) + _, err = tx.Exec(stmt, day, start, end) if err != nil { return fmt.Errorf("error inserting cl_rewards_gwei into validator_stats for day [%v], start [%v] and end [%v]: %w", day, start, end, err) } @@ -775,7 +759,7 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64) logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "cl_rewards_exported"); err != nil { + if err = markColumnExported(day, "cl_rewards_exported", tx); err != nil { return err } @@ -783,7 +767,7 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64) return nil } -func WriteValidatorBalances(validators []uint64, day uint64) error { +func WriteValidatorBalances(validators []uint64, day uint64, tx *sqlx.Tx) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() @@ -852,7 +836,7 @@ func WriteValidatorBalances(validators []uint64, day uint64) error { %s on conflict (validatorindex, day) do update set min_balance = excluded.min_balance, max_balance = excluded.max_balance, min_effective_balance = excluded.min_effective_balance, max_effective_balance = excluded.max_effective_balance, start_balance = excluded.start_balance, start_effective_balance = excluded.start_effective_balance, end_balance = excluded.end_balance, end_effective_balance = excluded.end_effective_balance;`, strings.Join(valueStrings, ",")) - _, err := WriterDb.Exec(stmt, valueArgs...) + _, err := tx.Exec(stmt, valueArgs...) if err != nil { return fmt.Errorf("error inserting balances into validator_stats for day [%v], start [%v] and end [%v]: %w", day, start, end, err) @@ -869,7 +853,7 @@ func WriteValidatorBalances(validators []uint64, day uint64) error { logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "balance_exported"); err != nil { + if err = markColumnExported(day, "balance_exported", tx); err != nil { return err } @@ -877,7 +861,7 @@ func WriteValidatorBalances(validators []uint64, day uint64) error { return nil } -func WriteValidatorDepositWithdrawals(day uint64) error { +func WriteValidatorDepositWithdrawals(day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_deposit_withdrawal_stats").Observe(time.Since(exportStart).Seconds()) @@ -896,12 +880,6 @@ func WriteValidatorDepositWithdrawals(day uint64) error { } lastSlot := utils.GetLastBalanceInfoSlotForDay(day) - tx, err := WriterDb.Beginx() - if err != nil { - return err - } - defer tx.Rollback() - start := time.Now() logrus.Infof("Resetting Withdrawals + Deposits for day [%v]", day) @@ -919,7 +897,7 @@ func WriteValidatorDepositWithdrawals(day uint64) error { withdrawals_amount = NULL WHERE day = $1%s;`, firstDayExtraCondition) - _, err = tx.Exec(resetQry, day) + _, err := tx.Exec(resetQry, day) if err != nil { return fmt.Errorf("error resetting validator_stats for day [%v]: %w", day, err) } @@ -985,13 +963,10 @@ func WriteValidatorDepositWithdrawals(day uint64) error { if err != nil { return fmt.Errorf("error inserting withdrawals into validator_stats for day [%v], firstSlot [%v] and lastSlot [%v]: %w", day, firstSlot, lastSlot, err) } - if err = tx.Commit(); err != nil { - return err - } logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "withdrawals_deposits_exported"); err != nil { + if err = markColumnExported(day, "withdrawals_deposits_exported", tx); err != nil { return err } @@ -999,7 +974,7 @@ func WriteValidatorDepositWithdrawals(day uint64) error { return nil } -func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { +func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_sync_stats").Observe(time.Since(exportStart).Seconds()) @@ -1026,12 +1001,6 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { syncStatsArr = append(syncStatsArr, stat) } - tx, err := WriterDb.Beginx() - if err != nil { - return err - } - defer tx.Rollback() - batchSize := 13000 // max parameters: 65535 for b := 0; b < len(syncStatsArr); b += batchSize { start := b @@ -1064,13 +1033,9 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { logrus.Infof("saving sync statistics batch %v completed", b) } - if err = tx.Commit(); err != nil { - return err - } - logger.Infof("export completed, took %v", time.Since(start)) - if err = markColumnExported(day, "sync_duties_exported"); err != nil { + if err = markColumnExported(day, "sync_duties_exported", tx); err != nil { return err } @@ -1078,7 +1043,7 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { return nil } -func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day uint64, concurrency uint64) error { +func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day uint64, concurrency uint64, tx *sqlx.Tx) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() exportStart := time.Now() @@ -1143,7 +1108,7 @@ func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day u default: } - err := saveFailedAttestationBatch(maArr[start:end], day) + err := saveFailedAttestationBatch(maArr[start:end], day, tx) if err != nil { return fmt.Errorf("error in saveFailedAttestationBatch for day [%v], start [%v] and end [%v]: %w", day, start, end, err) } @@ -1157,7 +1122,7 @@ func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day u } logger.Infof("export completed, took %v", time.Since(start)) - if err := markColumnExported(day, "failed_attestations_exported"); err != nil { + if err := markColumnExported(day, "failed_attestations_exported", tx); err != nil { return err } @@ -1165,7 +1130,7 @@ func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day u return nil } -func saveFailedAttestationBatch(batch []*types.ValidatorMissedAttestationsStatistic, day uint64) error { +func saveFailedAttestationBatch(batch []*types.ValidatorMissedAttestationsStatistic, day uint64, tx *sqlx.Tx) error { var failedAttestationBatchNumArgs int = 4 batchSize := len(batch) valueStrings := make([]string, 0, failedAttestationBatchNumArgs) @@ -1183,7 +1148,7 @@ func saveFailedAttestationBatch(batch []*types.ValidatorMissedAttestationsStatis %s on conflict (validatorindex, day) do update set missed_attestations = excluded.missed_attestations, orphaned_attestations = excluded.orphaned_attestations;`, strings.Join(valueStrings, ",")) - _, err := WriterDb.Exec(stmt, valueArgs...) + _, err := tx.Exec(stmt, valueArgs...) if err != nil { return fmt.Errorf("error inserting failed attestations into validator_stats for day [%v]: %w", day, err) } @@ -1191,11 +1156,11 @@ func saveFailedAttestationBatch(batch []*types.ValidatorMissedAttestationsStatis return nil } -func markColumnExported(day uint64, column string) error { +func markColumnExported(day uint64, column string, tx *sqlx.Tx) error { start := time.Now() logger.Infof("marking [%v] exported for day [%v] as completed in the status table", column, day) - _, err := WriterDb.Exec(fmt.Sprintf(` + _, err := tx.Exec(fmt.Sprintf(` INSERT INTO validator_stats_status (day, status, %[1]v) VALUES ($1, false, true) ON CONFLICT (day)