-
Notifications
You must be signed in to change notification settings - Fork 54
/
node.go
267 lines (196 loc) · 5.63 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
package forest
import (
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/go-xorm/xorm"
"github.com/labstack/gommon/log"
"time"
)
const (
JobNodePath = "/forest/server/node/"
JobNodeElectPath = "/forest/server/elect/leader"
TTL = 5
)
// job node
type JobNode struct {
id string
registerPath string
electPath string
etcd *Etcd
state int
apiAddress string
api *JobAPi
manager *JobManager
scheduler *JobScheduler
groupManager *JobGroupManager
exec *JobExecutor
engine *xorm.Engine
collection *JobCollection
failOver *JobSnapshotFailOver
listeners []NodeStateChangeListener
close chan bool
}
// node state change listener
type NodeStateChangeListener interface {
notify(int)
}
func NewJobNode(id string, etcd *Etcd, httpAddress, dbUrl string) (node *JobNode, err error) {
engine, err := xorm.NewEngine("mysql", dbUrl)
if err != nil {
return
}
node = &JobNode{
id: id,
registerPath: fmt.Sprintf("%s%s", JobNodePath, id),
electPath: JobNodeElectPath,
etcd: etcd,
state: NodeFollowerState,
apiAddress: httpAddress,
close: make(chan bool),
engine: engine,
listeners: make([]NodeStateChangeListener, 0),
}
node.failOver = NewJobSnapshotFailOver(node)
node.collection = NewJobCollection(node)
node.initNode()
// create job executor
node.exec = NewJobExecutor(node)
// create group manager
node.groupManager = NewJobGroupManager(node)
node.scheduler = NewJobScheduler(node)
// create job manager
node.manager = NewJobManager(node)
// create a job http api
node.api = NewJobAPi(node)
return
}
func (node *JobNode) addListeners() {
node.listeners = append(node.listeners, node.scheduler)
}
func (node *JobNode) changeState(state int) {
node.state = state
if len(node.listeners) == 0 {
return
}
// notify all listener
for _, listener := range node.listeners {
listener.notify(state)
}
}
// start register node
func (node *JobNode) initNode() {
txResponse, err := node.registerJobNode()
if err != nil {
log.Fatalf("the job node:%s, fail register to :%s", node.id, node.registerPath)
}
if !txResponse.Success {
log.Fatalf("the job node:%s, fail register to :%s,the job node id exist ", node.id, node.registerPath)
}
log.Printf("the job node:%s, success register to :%s", node.id, node.registerPath)
node.watchRegisterJobNode()
node.watchElectPath()
go node.loopStartElect()
}
// bootstrap
func (node *JobNode) Bootstrap() {
go node.groupManager.loopLoadGroups()
go node.manager.loopLoadJobConf()
<-node.close
}
func (node *JobNode) Close() {
node.close <- true
}
// watch the register job node
func (node *JobNode) watchRegisterJobNode() {
keyChangeEventResponse := node.etcd.Watch(node.registerPath)
go func() {
for ch := range keyChangeEventResponse.Event {
node.handleRegisterJobNodeChangeEvent(ch)
}
}()
}
// handle the register job node change event
func (node *JobNode) handleRegisterJobNodeChangeEvent(changeEvent *KeyChangeEvent) {
switch changeEvent.Type {
case KeyCreateChangeEvent:
case KeyUpdateChangeEvent:
case KeyDeleteChangeEvent:
log.Printf("found the job node:%s register to path:%s has lose", node.id, node.registerPath)
go node.loopRegisterJobNode()
}
}
func (node *JobNode) registerJobNode() (txResponse *TxResponse, err error) {
return node.etcd.TxKeepaliveWithTTL(node.registerPath, node.id, TTL)
}
// loop register the job node
func (node *JobNode) loopRegisterJobNode() {
RETRY:
var (
txResponse *TxResponse
err error
)
if txResponse, err = node.registerJobNode(); err != nil {
log.Printf("the job node:%s, fail register to :%s", node.id, node.registerPath)
time.Sleep(time.Second)
goto RETRY
}
if txResponse.Success {
log.Printf("the job node:%s, success register to :%s", node.id, node.registerPath)
} else {
v := txResponse.Value
if v != node.id {
time.Sleep(time.Second)
log.Fatalf("the job node:%s,the other job node :%s has already register to :%s", node.id, v, node.registerPath)
}
log.Printf("the job node:%s,has already success register to :%s", node.id, node.registerPath)
}
}
// elect the leader
func (node *JobNode) elect() (txResponse *TxResponse, err error) {
return node.etcd.TxKeepaliveWithTTL(node.electPath, node.id, TTL)
}
// watch the job node elect path
func (node *JobNode) watchElectPath() {
keyChangeEventResponse := node.etcd.Watch(node.electPath)
go func() {
for ch := range keyChangeEventResponse.Event {
node.handleElectLeaderChangeEvent(ch)
}
}()
}
// handle the job node leader change event
func (node *JobNode) handleElectLeaderChangeEvent(changeEvent *KeyChangeEvent) {
switch changeEvent.Type {
case KeyDeleteChangeEvent:
node.changeState(NodeFollowerState)
node.loopStartElect()
case KeyCreateChangeEvent:
case KeyUpdateChangeEvent:
}
}
// loop start elect
func (node *JobNode) loopStartElect() {
RETRY:
var (
txResponse *TxResponse
err error
)
if txResponse, err = node.elect(); err != nil {
log.Printf("the job node:%s,elect fail to :%s", node.id, node.electPath)
time.Sleep(time.Second)
goto RETRY
}
if txResponse.Success {
node.changeState(NodeLeaderState)
log.Printf("the job node:%s,elect success to :%s", node.id, node.electPath)
} else {
v := txResponse.Value
if v != node.id {
log.Printf("the job node:%s,give up elect request because the other job node:%s elect to:%s", node.id, v, node.electPath)
node.changeState(NodeFollowerState)
} else {
log.Printf("the job node:%s, has already elect success to :%s", node.id, node.electPath)
node.changeState(NodeLeaderState)
}
}
}