-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrocketpool.go
120 lines (103 loc) · 3.05 KB
/
rocketpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package db
import (
"strings"
"time"
"github.com/google/uuid"
pgx "github.com/jackc/pgx/v5"
"github.com/pkg/errors"
)
const (
selectValidatorCount = `
SELECT count(*)
FROM t_rocketpool;
`
identifyRocketpoolValidators = `
UPDATE t_identified_validators
SET f_pool_name = 'rocketpool'
FROM t_rocketpool
WHERE t_identified_validators.f_validator_pubkey = t_rocketpool.f_validator_pubkey;
`
)
// IdentifyRocketpoolValidators identifies the rocketpool validators and adds them to the identified validators table
func (p *PostgresDBService) IdentifyRocketpoolValidators() error {
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return errors.Wrap(err, "error acquiring database connection")
}
defer conn.Release()
_, err = conn.Query(p.ctx, identifyRocketpoolValidators)
if err != nil {
return errors.Wrap(err, "error identifying rocketpool validators")
}
return nil
}
// ObtainRocketpoolValidatorCount returns the number of validators in the database
func (p *PostgresDBService) ObtainRocketpoolValidatorCount() (int, error) {
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return 0, errors.Wrap(err, "error acquiring database connection")
}
defer conn.Release()
var count int
err = conn.QueryRow(p.ctx, selectValidatorCount).Scan(&count)
if err != nil {
return 0, errors.Wrap(err, "error obtaining validator count from database")
}
return count, nil
}
// CopyRocketpoolValidators copies the validators to the database
func (p *PostgresDBService) CopyRocketpoolValidators(rowSrc []string) int64 {
if len(rowSrc) == 0 {
return 0
}
p.writerThreadsWG.Add(1)
defer p.writerThreadsWG.Done()
startTime := time.Now()
// Generate a random text to append to the table name
randomText := uuid.New().String()
// Create the temporary table name with the random text
tempTableName := "temp_rocketpool_" + strings.ReplaceAll(randomText, "-", "_")
var validators [][]interface{}
for _, row := range rowSrc {
validators = append(validators, []interface{}{row})
}
// Acquire a database connection
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return 0
}
defer conn.Release()
// Create a temporary table with a unique constraint
_, err = conn.Exec(p.ctx, `
CREATE TEMP TABLE IF NOT EXISTS `+tempTableName+` (
f_validator_pubkey text
);
`)
if err != nil {
return 0
}
// Copy the data to the temporary table
_, err = conn.CopyFrom(p.ctx, pgx.Identifier{tempTableName}, []string{"f_validator_pubkey"}, pgx.CopyFromRows(validators))
if err != nil {
return 0
}
// Insert the data from the temporary table to the main table
count, err := conn.Exec(p.ctx, `
INSERT INTO t_rocketpool (f_validator_pubkey)
SELECT f_validator_pubkey
FROM `+tempTableName+`
ON CONFLICT (f_validator_pubkey) DO NOTHING;
`)
if err != nil {
return 0
}
// Drop the temporary table
_, err = conn.Exec(p.ctx, `DROP TABLE `+tempTableName+`;`)
if err != nil {
return 0
}
if count.RowsAffected() > 0 {
wlog.Debugf("persisted %d rows in %f seconds", count.RowsAffected(), time.Since(startTime).Seconds())
}
return count.RowsAffected()
}