-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlido.go
147 lines (127 loc) · 4.38 KB
/
lido.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package db
import (
"strings"
"time"
"github.com/google/uuid"
pgx "github.com/jackc/pgx/v5"
"github.com/pkg/errors"
)
const (
selectLidoOperatorsValidatorCount = `
WITH operators_sequence AS (
SELECT generate_series(0, (SELECT MAX(f_operator_index) FROM t_lido WHERE f_protocol=$1)) AS sequence_number
)
SELECT
COUNT(l.f_operator_index) as count
FROM operators_sequence os
LEFT JOIN t_lido l ON os.sequence_number = l.f_operator_index
AND l.f_protocol = $1
GROUP BY os.sequence_number
ORDER BY os.sequence_number ASC;
`
identifyLidoValidators = `
UPDATE t_identified_validators
SET f_pool_name = t_lido.f_operator
FROM t_lido
WHERE t_identified_validators.f_validator_pubkey = t_lido.f_validator_pubkey;
`
LidoProtocolCurated = "curated"
LidoProtocolCSM = "csm"
)
// IdentifyLidoValidators identifies the lido validators and adds them to the identified validators table
func (p *PostgresDBService) IdentifyLidoValidators() error {
p.writerThreadsWG.Add(1)
defer p.writerThreadsWG.Done()
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return errors.Wrap(err, "error acquiring database connection")
}
defer conn.Release()
_, err = conn.Query(p.ctx, identifyLidoValidators)
if err != nil {
return errors.Wrap(err, "error identifying lido validators")
}
return nil
}
// Obtain LidoOperatorsValidatorCount returns the number of validators in the Lido table for each operator
func (p *PostgresDBService) ObtainLidoOperatorsValidatorCount(protocol string) ([]uint64, error) {
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return nil, errors.Wrap(err, "error acquiring database connection")
}
defer conn.Release()
rows, err := conn.Query(p.ctx, selectLidoOperatorsValidatorCount, protocol)
if err != nil {
return nil, errors.Wrap(err, "error obtaining validator count from database")
}
defer rows.Close()
var operatorsValidatorCount []uint64
for rows.Next() {
var count uint64
err = rows.Scan(&count)
if err != nil {
return nil, errors.Wrap(err, "error scanning validator count from database")
}
operatorsValidatorCount = append(operatorsValidatorCount, count)
}
return operatorsValidatorCount, nil
}
// CopyLidoOperatorValidators copies the validators to the database for a given operator
func (p *PostgresDBService) CopyLidoOperatorValidators(operator string, operatorIndex uint64, rowSrc []string, protocol string) int64 {
if len(rowSrc) == 0 {
return 0
}
p.writerThreadsWG.Add(1)
defer p.writerThreadsWG.Done()
startTime := time.Now()
// Generate a random text to append to the table name
randomText := uuid.New().String()
// Create the temporary table name with the random text
tempTableName := "temp_lido_" + strings.ReplaceAll(randomText, "-", "_")
var validators [][]interface{}
for _, row := range rowSrc {
validators = append(validators, []interface{}{row, operator, operatorIndex, protocol})
}
// Acquire a database connection
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
wlog.Fatalf("error acquiring database connection: %v", err)
}
defer conn.Release()
// Create a temporary table with a unique constraint
_, err = conn.Exec(p.ctx, `
CREATE TEMP TABLE IF NOT EXISTS `+tempTableName+` (
f_validator_pubkey text,
f_operator text,
f_operator_index integer,
f_protocol text
);
`)
if err != nil {
wlog.Fatalf("error creating temporary table: %v", err)
}
// Copy the data to the temporary table
_, err = conn.CopyFrom(p.ctx, pgx.Identifier{tempTableName}, []string{"f_validator_pubkey", "f_operator", "f_operator_index", "f_protocol"}, pgx.CopyFromRows(validators))
if err != nil {
wlog.Fatalf("error copying data to temporary table: %v", err)
}
// Insert the data from the temporary table to the main table
count, err := conn.Exec(p.ctx, `
INSERT INTO t_lido (f_validator_pubkey, f_operator, f_operator_index, f_protocol)
SELECT f_validator_pubkey, f_operator, f_operator_index, f_protocol
FROM `+tempTableName+`
ON CONFLICT (f_validator_pubkey) DO NOTHING;
`)
if err != nil {
wlog.Fatalf("error inserting data from temporary table to main table: %v", err)
}
// Drop the temporary table
_, err = conn.Exec(p.ctx, `DROP TABLE `+tempTableName+`;`)
if err != nil {
wlog.Fatalf("error dropping temporary table: %v", err)
}
if count.RowsAffected() > 0 {
wlog.Debugf("persisted %d rows in %f seconds", count.RowsAffected(), time.Since(startTime).Seconds())
}
return count.RowsAffected()
}