-
Notifications
You must be signed in to change notification settings - Fork 1
/
types.go
213 lines (183 loc) · 5.6 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package gomaxscale
import (
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"
)
// CDCEventType is the type of the event.
type CDCEventType string
// List of possible event types.
const (
// CDCEventTypeDDL DDL (Data Definition Language) are for database changes.
CDCEventTypeDDL CDCEventType = "ddlEvent"
// CDCEventTypeDML DML (Data Manipulation Language) are for data changes.
CDCEventTypeDML CDCEventType = "dmlEvent"
)
// CDCEvent is the CDC event received from MaxScale.
type CDCEvent interface {
EventType() CDCEventType
}
// DDLEvent is a MaxScale DDL event.
//
// https://github.com/mariadb-corporation/MaxScale/blob/maxscale-6.2.4/Documentation/Routers/KafkaCDC.md#overview
type DDLEvent struct {
Namespace string `json:"namespace"`
Type string `json:"type"`
Name string `json:"name"`
Table string `json:"table"`
Database string `json:"database"`
Version int `json:"version"`
GTID string `json:"gtid"`
Fields []DDLEventField `json:"fields"`
}
// DDLEventField is a field in a DDL event.
type DDLEventField struct {
Name string `json:"name"`
Type DDLEventFieldValue `json:"type"`
RealType *string `json:"real_type"`
Length *int `json:"length"`
Unsigned *bool `json:"unsigned"`
}
// UnmarshalJSON decodes the JSON into the field.
func (d *DDLEventField) UnmarshalJSON(b []byte) error {
var tmp struct {
Name string `json:"name"`
Type interface{} `json:"type"`
RealType *string `json:"real_type"`
Length *int `json:"length"`
Unsigned *bool `json:"unsigned"`
}
if err := json.Unmarshal(b, &tmp); err != nil {
return err
}
d.Name = tmp.Name
d.RealType = tmp.RealType
d.Length = tmp.Length
d.Unsigned = tmp.Unsigned
switch t := tmp.Type.(type) {
case string:
d.Type = DDLEventFieldValueSimple{
ValueType: t,
}
case []interface{}:
var simple DDLEventFieldValueSimple
for i := range t {
value, ok := t[i].(string)
if !ok {
return fmt.Errorf("invalid simple value of type '%T'", t[i])
}
if strings.ToLower(value) == "null" {
simple.Null = true
}
simple.ValueType = value
}
d.Type = simple
case map[string]interface{}:
typeStr, ok := t["type"].(string)
if !ok {
return fmt.Errorf("missing type in complex column definition: %#v", t)
}
switch strings.ToLower(typeStr) {
case "enum":
name, ok := t["name"].(string)
if !ok {
return fmt.Errorf("missing name in enum column definition: %#v", t)
}
symbols, ok := t["symbols"].([]interface{})
if !ok {
return fmt.Errorf("missing symbols in enum column definition: %#v", t)
}
var enum DDLEventFieldValueEnum
enum.Name = name
for i := range symbols {
symbol, ok := symbols[i].(string)
if !ok {
return fmt.Errorf("symbol '%[1]v' (%[1]T) is not a string", symbols[i])
}
enum.Symbols = append(enum.Symbols, symbol)
}
default:
return fmt.Errorf("unknown type '%s' in complex column definition: %#v", typeStr, t)
}
default:
return fmt.Errorf("unknown type '%T' in column definition", t)
}
return nil
}
// EventType returns the type of the event.
func (d DDLEvent) EventType() CDCEventType {
return CDCEventTypeDDL
}
// DDLEventFieldType is the type of a field in a DDL event.
type DDLEventFieldType string
// List of possible field types.
const (
// DDLEventFieldTypeSimple is a simple field type.
DDLEventFieldTypeSimple DDLEventFieldType = "simple"
// DDLEventFieldTypeEnum is an enum field type.
DDLEventFieldTypeEnum DDLEventFieldType = "enum"
)
// DDLEventFieldValue is the generic representation of a field in a DDL event.
type DDLEventFieldValue interface {
Type() DDLEventFieldType
}
// DDLEventFieldValueSimple is a simple field type.
type DDLEventFieldValueSimple struct {
ValueType string
Null bool
}
// Type returns the type of the field.
func (d DDLEventFieldValueSimple) Type() DDLEventFieldType {
return DDLEventFieldTypeSimple
}
// DDLEventFieldValueEnum is an enum field type.
type DDLEventFieldValueEnum struct {
Name string
Symbols []string
}
// Type returns the type of the field.
func (d DDLEventFieldValueEnum) Type() DDLEventFieldType {
return DDLEventFieldTypeEnum
}
// DMLEvent is a MaxScale DML event.
//
// https://github.com/mariadb-corporation/MaxScale/blob/maxscale-6.2.4/Documentation/Routers/KafkaCDC.md#overview
type DMLEvent struct {
Domain int `json:"domain"`
ServerID int `json:"server_id"`
Sequence int `json:"sequence"`
EventNumber int `json:"event_number"`
Timestamp int64 `json:"timestamp"`
Type string `json:"event_type"`
// RawData contains all the JSON data related to this event. It should be
// used to retrieve table specific data. For example, a table containing an
// `id` and a `name` columns could be retrieved as the following:
//
// var tableData struct {
// ID int `json:"id"`
// Name string `json:"name"`
// }
// err := json.Unmarshal(event.RawData, &tableData)
//
RawData []byte `json:"-"`
}
// EventType returns the type of the event.
func (d DMLEvent) EventType() CDCEventType {
return CDCEventTypeDML
}
// Stats stores information about the running library in a specific period of
// time.
type Stats struct {
NumberOfEvents int64
ProcessingTime time.Duration
}
func (s *Stats) add(processingTime time.Duration) {
atomic.AddInt64(&s.NumberOfEvents, 1)
atomic.AddInt64((*int64)(&s.ProcessingTime), processingTime.Nanoseconds())
}
func (s *Stats) reset() {
atomic.StoreInt64(&s.NumberOfEvents, 0)
atomic.StoreInt64((*int64)(&s.ProcessingTime), 0)
}