-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathidentified_validators.go
104 lines (95 loc) · 2.42 KB
/
identified_validators.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package db
import "github.com/pkg/errors"
const (
addNewValidatorsQuery = `
INSERT INTO t_identified_validators (f_validator_pubkey, f_pool_name)
SELECT DISTINCT F_VALIDATOR_PUBKEY, 'solo_stakers'::text
FROM T_BEACON_DEPOSITS
WHERE F_VALIDATOR_PUBKEY != ''
ON CONFLICT (f_validator_pubkey) DO NOTHING;
`
truncateIdentifiedValidatorsQuery = `
TRUNCATE TABLE t_identified_validators;
`
identifyWhalesQuery = `
UPDATE t_identified_validators
SET f_pool_name = subquery2.whale_id
FROM (
SELECT DISTINCT
t1.f_validator_pubkey,
whale_id
FROM
t_beacon_deposits t1
RIGHT JOIN (
SELECT
f_depositor,
CONCAT('whale_0x', LEFT(f_depositor, 4)) AS whale_id
FROM (
SELECT
COUNT(*) AS COUNT,
F_DEPOSITOR
FROM
(
SELECT DISTINCT
F_VALIDATOR_PUBKEY,
F_DEPOSITOR
FROM
T_BEACON_DEPOSITS
WHERE
F_VALIDATOR_PUBKEY != ''
) aux
GROUP BY
F_DEPOSITOR
) AS count_subquery
WHERE
count >= $1
) AS subquery
ON
t1.f_depositor = subquery.f_depositor
WHERE
F_VALIDATOR_PUBKEY != ''
) AS subquery2
WHERE
t_identified_validators.f_validator_pubkey = subquery2.f_validator_pubkey;
`
)
func (p *PostgresDBService) AddNewValidators() error {
p.writerThreadsWG.Add(1)
defer p.writerThreadsWG.Done()
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return errors.Wrap(err, "error acquiring connection from pool")
}
defer conn.Release()
_, err = conn.Exec(p.ctx, addNewValidatorsQuery)
if err != nil {
return errors.Wrap(err, "error adding new validators to database")
}
return nil
}
func (p *PostgresDBService) TruncateIdentifiedValidators() error {
p.writerThreadsWG.Add(1)
defer p.writerThreadsWG.Done()
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return errors.Wrap(err, "error acquiring connection from pool")
}
defer conn.Release()
_, err = conn.Exec(p.ctx, truncateIdentifiedValidatorsQuery)
if err != nil {
return errors.Wrap(err, "error truncating identified validators")
}
return nil
}
func (p *PostgresDBService) IdentifyWhales(threshold int) error {
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return errors.Wrap(err, "error acquiring database connection")
}
defer conn.Release()
_, err = conn.Exec(p.ctx, identifyWhalesQuery, threshold)
if err != nil {
return errors.Wrap(err, "error identifying whales")
}
return nil
}