-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraft.go
172 lines (154 loc) · 4.53 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package raft
import (
"math/rand"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// ConsensusModule has a State Machine with 3 States
// 1 - Follower
// 2 - Candidate (timeout start an election)
// 3 - Leader (receive votes from majority)
// CMState by default is a follower
type CMState int
const (
Follower CMState = 0
Candidate = 1
Leader = 2
Dead = 3
)
func (state CMState) String() string {
switch state {
case Follower:
return "Follower"
case Candidate:
return "Candidate"
case Leader:
return "Leader"
case Dead:
return "Dead"
default:
panic("unreachable")
}
}
// ConsensusModule receives commands from clients and adds them to its log.
// It communicates with other ConsensusModules to ensure that every log entry is safely replicated.
type ConsensusModule struct {
id int // server id
peersIds []int // lists the IDs of our peers in the cluster
server *Server // used to send RPCs to other servers
// Persistent state on all servers
mu sync.Mutex // Lock to protect shared access to this peer's state
currentTerm int // latest term server has seen (initialized to 0 on first boot, increases monotonically)
votedFor int // candidateId that received vote in current term (or null if none)
// Volatile state on all servers
state CMState // current state of the server (follower, candidate, leader)
leaderHeartbeat time.Time // time when we last heard from the leader
}
func NewConsensusModule(id int, peersIds []int, server *Server) *ConsensusModule {
cm := &ConsensusModule{
id: id,
peersIds: peersIds,
state: Follower,
server: server,
votedFor: -1,
}
cm.leaderHeartbeat = time.Now()
// Start the election timer
go cm.runElectionTimer()
return cm
}
// runElectionTimer runs the election timer in a separate goroutine
// Every follower runs an election timer that restarts every time it hears from the leader
// If the election timer expires, the follower starts an election
func (cm *ConsensusModule) runElectionTimer() {
timeoutDuration := cm.electionTimeout()
cm.mu.Lock()
termStarted := cm.currentTerm
cm.mu.Unlock()
// When the election timer expires, start a new election
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cm.mu.Lock()
if cm.state != Follower {
log.Debugf("Server %v is not a follower", cm.id)
cm.mu.Unlock()
return
}
// If the term has changed, it means that a leader has been elected
if termStarted != cm.currentTerm {
log.Debugf("Server %v term has changed %d to %d", cm.id, termStarted, cm.currentTerm)
cm.mu.Unlock()
return
}
// Start an election for follower, if we haven't heard from the leader for a while or haven't voted for a candidate
if time.Since(cm.leaderHeartbeat) > timeoutDuration {
cm.startElection()
return
}
cm.mu.Unlock()
}
}
}
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm++
cm.votedFor = cm.id
cm.leaderHeartbeat = time.Now()
cm.sendRequestVote()
}
func (cm *ConsensusModule) electionTimeout() time.Duration {
// Election timeout is a random value between 150ms and 300ms as per the Raft paper
return time.Duration(150+rand.Intn(150)) * time.Millisecond
}
func (cm *ConsensusModule) sendRequestVote() {
// Send RequestVote RPCs to all other servers in the cluster
for _, peerId := range cm.peersIds {
if peerId == cm.id {
continue
}
go func(peerId int) {
// Send RequestVote RPC to peer
// If peer votes for us, we increment the vote count
// If we receive votes from majority, we become the leader
// If we receive a heartbeat from the leader, we reset the election timer
}(peerId)
}
}
// startLeader changes the state of the server to Leader
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
// Send heartbeats to all other servers in the cluster as long as we are the leader
go func() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
cm.sendHeartbeats()
select {
case <-ticker.C:
// check if we are still the leader
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
}
}
}()
}
func (cm *ConsensusModule) sendHeartbeats() {
// Send AppendEntries RPCs to all other servers in the cluster
for _, peerId := range cm.peersIds {
if peerId == cm.id {
continue
}
go func(peerId int) {
// Send AppendEntries RPC to peer
// If peer receives the heartbeat, it resets the election timer
}(peerId)
}
}