-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathsubscription.go
113 lines (101 loc) · 2.32 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package pgcat
import (
"context"
"regexp"
"github.com/jackc/pgx/v4"
)
type subscription struct {
Name string
Hostname string
Port uint16
Username string
Password string
Dbname string
Publications []string
CopyData bool
Enabled bool
mapping []tableMapping
}
type tableMapping struct {
priority int
src, dst string
regexp *regexp.Regexp
}
func querySubscription(tx pgx.Tx, name string) (*subscription, error) {
row := tx.QueryRow(context.Background(), "select * from pgcat_subscription where name=$1", name)
sub := &subscription{}
err := row.Scan(
&sub.Name,
&sub.Hostname,
&sub.Port,
&sub.Username,
&sub.Password,
&sub.Dbname,
&sub.Publications,
&sub.CopyData,
&sub.Enabled,
)
if err != nil {
return nil, err
}
rows, err := tx.Query(context.Background(), `select priority, src, dst from pgcat_table_mapping
where subscription=$1 order by priority`, sub.Name)
if err != nil {
return nil, err
}
for rows.Next() {
tm := tableMapping{}
if err := rows.Scan(&tm.priority, &tm.src, &tm.dst); err != nil {
return nil, err
}
tm.regexp = regexp.MustCompile(tm.src)
sub.mapping = append(sub.mapping, tm)
}
if err := rows.Err(); err != nil {
return nil, err
}
return sub, nil
}
func querySubscriptions(tx pgx.Tx) (map[string]*subscription, error) {
subscriptions := make(map[string]*subscription)
rows, err := tx.Query(context.Background(), "select * from pgcat_subscription")
if err != nil {
return nil, err
}
defer rows.Close()
// Iterate through the result set
for rows.Next() {
sub := &subscription{}
err = rows.Scan(
&sub.Name,
&sub.Hostname,
&sub.Port,
&sub.Username,
&sub.Password,
&sub.Dbname,
&sub.Publications,
&sub.CopyData,
&sub.Enabled,
)
if err != nil {
return nil, err
}
subscriptions[sub.Name] = sub
}
for _, sub := range subscriptions {
rows, err := tx.Query(context.Background(), `select priority, src, dst from pgcat_table_mapping
where subscription=$1 order by priority`, sub.Name)
if err != nil {
return nil, err
}
for rows.Next() {
tm := tableMapping{}
if err := rows.Scan(&tm.priority, &tm.src, &tm.dst); err != nil {
return nil, err
}
tm.regexp = regexp.MustCompile(tm.src)
sub.mapping = append(sub.mapping, tm)
}
}
return subscriptions, nil
}