-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraft_instance.go
301 lines (254 loc) · 9.15 KB
/
graft_instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package graft
import (
"context"
"net"
"sync"
"time"
"graft/pb"
"google.golang.org/grpc"
)
const (
heartbeatDuration = 10 * time.Millisecond
)
type (
GraftInstance[T any] struct {
sync.Mutex
pb.UnimplementedGraftServer
machineId machineID
cluster *cluster
electionState electionState
leaderState leaderState
log replicatedLog[T]
}
// models any meta-data associated with an election
electionState struct {
electionTimer *electionTimer
currentTerm int64
hasVoted bool
}
// leaderState is volatile state that the machine maintains when elected leader
leaderState struct {
heartbeatTimer *time.Timer
nextIndex map[machineID]int
matchIndex map[machineID]int
}
)
// NewGraftInstance constructs a new graft instance however it starts off as disabled
func NewGraftInstance[T any](configuration []byte, thisMachineID machineID, operationCommitCallback func(T), serializer Serializer[T]) *GraftInstance[T] {
graftConfig := parseGraftConfig(configuration)
cluster := connectToCluster(graftConfig, thisMachineID)
instance := &GraftInstance[T]{
machineId: thisMachineID,
cluster: cluster,
log: newLog(operationCommitCallback, serializer),
leaderState: leaderState{
nextIndex: make(map[machineID]int),
matchIndex: make(map[machineID]int),
},
}
// setup timers and callbacks
instance.electionState.electionTimer = newElectionTimer(graftConfig /* electionRunner: */, instance.runElection)
instance.leaderState.heartbeatTimer = time.AfterFunc(heartbeatDuration, instance.sendHeartbeat)
instance.leaderState.heartbeatTimer.Stop()
return instance
}
// start loads up and initializes this graft instance (start it up as a follower and everything)
func (m *GraftInstance[T]) Start() {
// start the RPC server for this graft instance and switch to follower mode
// start an RPC server on 8080
lis, err := net.Listen("tcp", ":8080")
server := grpc.NewServer()
pb.RegisterGraftServer(server, m)
if err != nil {
panic(err)
}
// start the server in a different goroutine from main
go func() {
if err := server.Serve(lis); err != nil {
panic(err)
}
}()
m.transitionToFollowerMode( /* term = */ 0)
}
// ApplyOperation replicates and commits an operation to the cluster wide replicated log :D
func (m *GraftInstance[T]) ApplyOperation(operation T) {
m.Lock()
defer m.Unlock()
if m.cluster.currentLeader == m.machineId {
// replicate the entry by just adding it to the log
// the operation will then be propagated during future heartbeats
m.log.entries = append(m.log.entries, logEntry[T]{
applicationTerm: m.electionState.currentTerm,
operation: operation,
})
} else {
m.cluster.pushOperationToLeader(m.log.serializer.ToString(operation))
}
}
// transitions the graft state machine to follower mode
func (m *GraftInstance[T]) transitionToFollowerMode(newTerm int64) {
m.electionState.currentTerm = newTerm
m.electionState.hasVoted = false
m.electionState.electionTimer.start()
m.leaderState.heartbeatTimer.Stop()
}
// transitions the graft state machine to leader mode
func (m *GraftInstance[T]) transitionToLeaderMode() {
m.cluster.currentLeader = m.machineId
m.electionState.electionTimer.stop()
m.leaderState.heartbeatTimer.Reset(heartbeatDuration)
}
// runElection triggers an election by sending vote requests to all machines within the cluster
func (m *GraftInstance[T]) runElection() {
m.Lock()
defer m.Unlock()
totalVotes, newTerm := m.cluster.requestVote(&pb.RequestVoteArgs{
Term: m.electionState.currentTerm,
CandidateId: m.machineId,
LastLogIndex: m.log.lastIndex(),
LastLogTerm: m.log.getLastEntry().applicationTerm,
})
m.electionState.currentTerm = int64(newTerm)
hasWonElection := totalVotes > (m.cluster.clusterSize()/2) && newTerm == int(m.electionState.currentTerm)
if hasWonElection {
m.transitionToLeaderMode()
}
}
// sendHeartbeat involves just updating every single machine in the cluster with knowledge of the new leader
// unlike the actual operation to push new log entries we do not have to wait for everything to respond
func (m *GraftInstance[T]) sendHeartbeat() {
m.Lock()
currentTerm := m.electionState.currentTerm
m.Unlock()
for memberID := range m.cluster.machines {
go func(memberID machineID) {
m.Lock()
nextIndex := m.leaderState.nextIndex[memberID]
heartbeatArgs := &pb.AppendEntriesArgs{
Term: currentTerm,
LeaderId: m.machineId,
PrevLogIndex: int64(nextIndex - 1),
PrevLogTerm: m.log.getPrevEntry(nextIndex).applicationTerm,
LeaderCommit: int64(m.log.lastCommitted),
Entries: m.log.serializeRange(
/* rangeStart = */ nextIndex,
),
}
m.Unlock()
// the response from this heartbeat may dictate that we need to move to follower mode
// ie. we had an outdated term, also worth noting that the switch to follower function will
// already cancel any outbound requests
response := m.cluster.appendEntryForMember(memberID, heartbeatArgs, currentTerm)
if response == nil {
return
}
m.Lock()
if response.Accepted {
// update the meta-data tracking how much of memberID's log matches with outs
m.leaderState.nextIndex[memberID] += 1
m.leaderState.matchIndex[memberID] = m.leaderState.nextIndex[memberID] - 1
m.log.updateCommitIndex(m.getNextCommittableIndex())
} else {
if response.CurrentTerm > m.electionState.currentTerm {
m.transitionToFollowerMode( /* newTerm = */ response.CurrentTerm)
} else {
// decrement the next index so on the next heartbeat we send more updated values
m.leaderState.nextIndex[memberID] -= 1
}
}
m.Unlock()
}(memberID)
}
}
// getNextCommittableIndex determines the next index in this graft instance that we can commit
// and alert to all other instances in this cluster
func (m *GraftInstance[T]) getNextCommittableIndex() int {
nextCommittableOperation := m.log.lastCommitted
for candidateCommitIndex := m.log.lastCommitted + 1; candidateCommitIndex < len(m.log.entries); candidateCommitIndex++ {
numMachinesReplicatedOn := 0
for machineId := range m.cluster.machines {
if m.leaderState.matchIndex[machineId] >= candidateCommitIndex {
numMachinesReplicatedOn += 1
}
}
committable := numMachinesReplicatedOn > (m.cluster.clusterSize() / 2)
if committable {
nextCommittableOperation = candidateCommitIndex
} else {
break
}
}
return nextCommittableOperation
}
// ==== gRPC stub implementations ====
// basically all of these are blind copies from the original raft paper with minimal modification
// RequestVote is the (follower) implementation for the election workflow, it responds when a candidate requests a vote
func (m *GraftInstance[T]) RequestVote(ctx context.Context, args *pb.RequestVoteArgs) (*pb.RequestVoteResponse, error) {
if contextCancelled, err := contextIsCancelled(ctx); contextCancelled {
return nil, err
}
m.Lock()
defer m.Unlock()
// transition to follower mode if we witness our term is outdated
if args.Term > m.electionState.currentTerm {
m.transitionToFollowerMode( /* newTerm = */ args.Term)
}
logUpToDate := m.log.entries[args.LastLogIndex].applicationTerm == args.LastLogTerm
eligibleForElection := args.Term >= m.electionState.currentTerm && logUpToDate && !m.electionState.hasVoted
if eligibleForElection {
m.electionState.hasVoted = true
}
return &pb.RequestVoteResponse{
VoteGranted: eligibleForElection,
CurrentTerm: m.electionState.currentTerm,
}, nil
}
// AppendEntries is the follower side of the log replication component of raft
func (m *GraftInstance[T]) AppendEntries(ctx context.Context, args *pb.AppendEntriesArgs) (*pb.AppendEntriesResponse, error) {
if contextCancelled, err := contextIsCancelled(ctx); contextCancelled {
return nil, err
}
m.Lock()
defer m.Unlock()
// transition to follower mode if we witness our term is outdated
if args.Term > m.electionState.currentTerm {
m.transitionToFollowerMode( /* newTerm = */ m.electionState.currentTerm)
}
requestIsAccepted := args.Term >= m.electionState.currentTerm &&
(args.PrevLogIndex == -1 ||
m.log.entries[args.PrevLogIndex].applicationTerm == args.PrevLogTerm)
if requestIsAccepted {
m.cluster.currentLeader = args.LeaderId
m.log.appendEntries(
/* applicationIndex = */ int(args.PrevLogIndex)+1,
args.Entries,
)
m.log.updateCommitIndex(int(args.LeaderCommit))
}
return &pb.AppendEntriesResponse{
CurrentTerm: m.electionState.currentTerm,
Accepted: requestIsAccepted,
}, nil
}
// AddLogEntry assumes this machine is the leader and it requests us to add a log entry to the log
func (m *GraftInstance[T]) AddLogEntry(ctx context.Context, args *pb.AddLogEntryArgs) (*pb.Empty, error) {
m.Lock()
defer m.Unlock()
if m.cluster.currentLeader == m.machineId {
m.log.entries = append(m.log.entries, logEntry[T]{
applicationTerm: m.electionState.currentTerm,
operation: m.log.serializer.FromString(args.Operation),
})
}
return nil, nil
}
// contextIsCancelled is a smaller helper function to determine if a context is cancelled
// primarily used to prevent the unnecessary processing of gRPC calls
func contextIsCancelled(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
default:
return false, nil
}
}