diff --git a/raft/raft.go b/raft/raft.go index 1280aec..20914e1 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -20,6 +20,7 @@ type Raft struct { config *Config logger *zap.Logger + // lastHeartbeat stores the last time of a valid RPC received from the leader lastHeartbeat time.Time // rpcCh stores incoming RPCs @@ -58,97 +59,66 @@ func NewRaft(id uint32, peers map[uint32]Peer, persister Persister, config *Conf // RPC handlers func (r *Raft) applyCommand(req *pb.ApplyCommandRequest) (*pb.ApplyCommandResponse, error) { - if r.state != Leader { - return nil, errNotLeader - } + // TODO: (B.1)* - if not leader, reject client operation and returns `errNotLeader` - lastLogId, _ := r.getLastLog() - e := &pb.Entry{Id: lastLogId + 1, Term: r.currentTerm, Data: req.GetData()} - r.appendLogs([]*pb.Entry{e}) + // TODO: (B.1)* - create a new log entry, append to the local entries + // Hint: + // - use `getLastLog` to get the last log ID + // - use `appendLogs` to append new log - return &pb.ApplyCommandResponse{Entry: e}, nil + // TODO: (B.1)* - return the new log entry + return nil, nil } func (r *Raft) appendEntries(req *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) { - if req.GetTerm() < r.currentTerm { - r.logger.Info("reject append entries since current term is older") - - return &pb.AppendEntriesResponse{Term: r.currentTerm, Success: false}, nil - } + // TODO: (A.1) - reply false if term < currentTerm + // Log: r.logger.Info("reject append entries since current term is older") - r.lastHeartbeat = time.Now() + // TODO: (A.2)* - reset the `lastHeartbeat` + // Description: start from the current line, the current request is a valid RPC - // increase term if receive a newer one - if req.GetTerm() > r.currentTerm { - r.toFollower(req.GetTerm()) - r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm)) - } + // TODO: (A.3) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower + // Hint: use `toFollower` to convert to follower + // Log: r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm)) - if req.GetTerm() == r.currentTerm && r.state != Follower { - r.toFollower(req.GetTerm()) - r.logger.Info("receive request from leader, fallback to follower", zap.Uint64("term", r.currentTerm)) - } + // TODO: (A.4) - if AppendEntries RPC received from new leader: convert to follower + // Log: r.logger.Info("receive request from leader, fallback to follower", zap.Uint64("term", r.currentTerm)) - // verify the last log entry prevLogId := req.GetPrevLogId() prevLogTerm := req.GetPrevLogTerm() if prevLogId != 0 && prevLogTerm != 0 { - log := r.getLog(prevLogId) - - if prevLogTerm != log.GetTerm() { - r.logger.Info("the given previous log from leader is missing or mismatched", - zap.Uint64("prevLogId", prevLogId), - zap.Uint64("prevLogTerm", prevLogTerm), - zap.Uint64("logTerm", log.GetTerm())) - - return &pb.AppendEntriesResponse{Term: r.currentTerm, Success: false}, nil - } + // TODO: (B.2) - reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm + // Hint: use `getLog` to get log with ID equals to prevLogId + // Log: r.logger.Info("the given previous log from leader is missing or mismatched", zap.Uint64("prevLogId", prevLogId), zap.Uint64("prevLogTerm", prevLogTerm), zap.Uint64("logTerm", log.GetTerm())) } if len(req.GetEntries()) != 0 { - // delete entries after previous log - r.deleteLogs(prevLogId) - - // append new entries - r.appendLogs(req.GetEntries()) - - r.logger.Info("receive and append new entries", - zap.Int("newEntries", len(req.GetEntries())), - zap.Int("numberOfEntries", len(r.logs)), - ) + // TODO: (B.3) - if an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it + // TODO: (B.4) - append any new entries not already in the log + // Hint: use `deleteLogs` follows by `appendLogs` + // Log: r.logger.Info("receive and append new entries", zap.Int("newEntries", len(req.GetEntries())), zap.Int("numberOfEntries", len(r.logs))) } - if req.GetLeaderCommitId() > r.commitIndex { - lastLogId, _ := r.getLastLog() - if req.GetLeaderCommitId() < lastLogId { - r.setCommitIndex(req.GetLeaderCommitId()) - } else { - r.setCommitIndex(lastLogId) - } - - r.logger.Info("update commit index from leader", zap.Uint64("commitIndex", r.commitIndex)) - go r.applyLogs(r.applyCh) - } + // TODO: (B.5) - if leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) + // Hint: use `getLastLog` to get the index of last new entry + // Hint: use `applyLogs` to apply(commit) new logs in background + // Log: r.logger.Info("update commit index from leader", zap.Uint64("commitIndex", r.commitIndex)) return &pb.AppendEntriesResponse{Term: r.currentTerm, Success: true}, nil } func (r *Raft) requestVote(req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) { - // reject if current term is older - if req.GetTerm() < r.currentTerm { - r.logger.Info("reject request vote since current term is older") + // TODO: (A.5) - reply false if term < currentTerm + // Log: r.logger.Info("reject request vote since current term is older") - return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: false}, nil - } + // TODO: (A.6) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower + // Hint: use `toFollower` to convert to follower + // Log: r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm)) - // increase term if receive a newer one - if req.GetTerm() > r.currentTerm { - r.toFollower(req.GetTerm()) - r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm)) - } + if false { + // TODO: (A.7) - if votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote + // Hint: (fix the condition) if already vote for another candidate, reply false - // reject if already vote for another candidate - if r.votedFor != 0 && r.votedFor != req.GetCandidateId() { r.logger.Info("reject since already vote for another candidate", zap.Uint64("term", r.currentTerm), zap.Uint32("votedFor", r.votedFor)) @@ -156,19 +126,23 @@ func (r *Raft) requestVote(req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: false}, nil } - lastLogId, lastLogTerm := r.getLastLog() - - // reject if last log entry is more up-to-date - if lastLogTerm > req.GetLastLogTerm() || (lastLogTerm == req.GetLastLogTerm() && lastLogId > req.GetLastLogId()) { + if false { + // TODO: (A.7) - if votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote + // Hint: (fix the condition) if the local last entry is more up-to-date than the candidate's last entry, reply false + // Hint: use `getLastLog` to get the last log entry r.logger.Info("reject since last entry is more up-to-date") return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: false}, nil } + // TODO: (A.7) - if votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote + // Hint: now vote should be granted, use `voteFor` to set votedFor r.voteFor(req.GetCandidateId(), false) - r.lastHeartbeat = time.Now() r.logger.Info("vote for another candidate", zap.Uint32("votedFor", r.votedFor)) + // TODO: (A.8)* - reset the `lastHeartbeat` + // Description: start from the current line, the current request is a valid RPC + return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: true}, nil } @@ -234,7 +208,8 @@ func (r *Raft) runFollower(ctx context.Context) { } func (r *Raft) handleFollowerHeartbeatTimeout() { - r.toCandidate() + // TODO: (A.9) - if election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate + // Hint: use `toCandidate` to convert to candidate r.logger.Info("heartbeat timeout, change state from follower to candidate") } @@ -285,8 +260,9 @@ func (r *Raft) runCandidate(ctx context.Context) { } func (r *Raft) voteForSelf(grantedVotes *int) { - r.voteFor(r.id, true) - (*grantedVotes)++ + // TODO: (A.10) increment currentTerm + // TODO: (A.10) vote for self + // Hint: use `voteFor` to vote for self r.logger.Info("vote for self", zap.Uint64("term", r.currentTerm)) } @@ -294,48 +270,39 @@ func (r *Raft) voteForSelf(grantedVotes *int) { func (r *Raft) broadcastRequestVote(ctx context.Context, voteCh chan *voteResult) { r.logger.Info("broadcast request vote", zap.Uint64("term", r.currentTerm)) - lastLogId, lastLogTerm := r.getLastLog() - req := &pb.RequestVoteRequest{ - Term: r.currentTerm, - CandidateId: r.id, - LastLogId: lastLogId, - LastLogTerm: lastLogTerm, + // TODO: (A.11) - send RequestVote RPCs to all other servers (set all fields of `req`) + // Hint: use `getLastLog` to get the last log entry } + // TODO: (A.11) - send RequestVote RPCs to all other servers (modify the code to send `RequestVote` RPCs in parallel) for peerId, peer := range r.peers { peerId := peerId peer := peer - go func() { - resp, err := peer.RequestVote(ctx, req) - if err != nil { - r.logger.Error("fail to send RequestVote RPC", zap.Error(err), zap.Uint32("peer", peerId)) - return - } + resp, err := peer.RequestVote(ctx, req) + if err != nil { + r.logger.Error("fail to send RequestVote RPC", zap.Error(err), zap.Uint32("peer", peerId)) + return + } - voteCh <- &voteResult{RequestVoteResponse: resp, peerId: peerId} - }() + voteCh <- &voteResult{RequestVoteResponse: resp, peerId: peerId} } } func (r *Raft) handleVoteResult(vote *voteResult, grantedVotes *int, votesNeeded int) { - if vote.GetTerm() > r.currentTerm { - r.toFollower(vote.GetTerm()) - r.logger.Info("receive new term on RequestVote response, fallback to follower", zap.Uint32("peer", vote.peerId)) - - return - } + // TODO: (A.12) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower + // Hint: use `toFollower` to convert to follower + // Log: r.logger.Info("receive new term on RequestVote response, fallback to follower", zap.Uint32("peer", vote.peerId)) if vote.VoteGranted { (*grantedVotes)++ r.logger.Info("vote granted", zap.Uint32("peer", vote.peerId), zap.Int("grantedVote", (*grantedVotes))) } - if (*grantedVotes) >= votesNeeded { - r.toLeader() - r.logger.Info("election won", zap.Int("grantedVote", (*grantedVotes)), zap.Uint64("term", r.currentTerm)) - } + // TODO: (A.13) - if votes received from majority of servers: become leader + // Log: r.logger.Info("election won", zap.Int("grantedVote", (*grantedVotes)), zap.Uint64("term", r.currentTerm)) + // Hint: use `toLeader` to convert to leader } // leader related @@ -384,96 +351,60 @@ func (r *Raft) broadcastAppendEntries(ctx context.Context, appendEntriesResultCh peerId := peerId peer := peer - prevLog := r.getLog(r.nextIndex[peerId] - 1) - entries := r.getLogs(r.nextIndex[peerId]) - - req := &pb.AppendEntriesRequest{ - Term: r.currentTerm, - LeaderId: r.id, - LeaderCommitId: r.commitIndex, - Entries: entries, + // TODO: (A.14) - send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods to prevent election timeouts + // Hint: set `req` with the correct fields (entries, prevLogId, prevLogTerm can be ignored for heartbeat) + // TODO: (B.6) - send AppendEntries RPC with log entries starting at nextIndex + // Hint: set `req` with the correct fields (entries, prevLogId and prevLogTerm MUST be set) + // Hint: use `getLog` to get specific log, `getLogs` to get all logs after and include the specific log Id + // Log: r.logger.Debug("send append entries", zap.Uint32("peer", peerId), zap.Any("request", req), zap.Int("entries", len(entries))) + req := &pb.AppendEntriesRequest{} + + // TODO: (A.14) & (B.6) + // Hint: modify the code to send `AppendEntries` RPCs in parallel + resp, err := peer.AppendEntries(ctx, req) + if err != nil { + r.logger.Error("fail to send AppendEntries RPC", zap.Error(err), zap.Uint32("peer", peerId)) + // connection issue, should not be handled + return } - if prevLog != nil { - req.PrevLogId = prevLog.GetId() - req.PrevLogTerm = prevLog.GetTerm() + appendEntriesResultCh <- &appendEntriesResult{ + AppendEntriesResponse: resp, + req: req, + peerId: peerId, } - - // r.logger.Debug("send append entries", zap.Uint32("peer", peerId), zap.Any("request", req), zap.Int("entries", len(entries))) - - go func() { - resp, err := peer.AppendEntries(ctx, req) - if err != nil { - r.logger.Error("fail to send AppendEntries RPC", zap.Error(err), zap.Uint32("peer", peerId)) - // connection issue, should not be handled - return - } - - appendEntriesResultCh <- &appendEntriesResult{ - AppendEntriesResponse: resp, - req: req, - peerId: peerId, - } - }() } } func (r *Raft) handleAppendEntriesResult(result *appendEntriesResult) { - peerId := result.peerId - logger := r.logger.With(zap.Uint32("peer", peerId)) - - if result.GetTerm() > r.currentTerm { - r.toFollower(result.GetTerm()) - logger.Info("receive new term on AppendEntries response, fallback to follower") - - return - } + // TODO: (A.15) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower + // Hint: use `toFollower` to convert to follower + // Log: r.logger.Info("receive new term on AppendEntries response, fallback to follower", zap.Uint32("peer", result.peerId)) entries := result.req.GetEntries() if !result.GetSuccess() { - // if failed, decrease `nextIndex` and retry - nextIndex := r.nextIndex[peerId] - 1 - matchIndex := r.matchIndex[peerId] - r.setNextAndMatchIndex(peerId, nextIndex, matchIndex) - - logger.Info("append entries failed, decrease next index", - zap.Uint64("nextIndex", nextIndex), - zap.Uint64("matchIndex", matchIndex)) + // TODO: (B.7) - if AppendEntries fails because of log inconsistency: decrease nextIndex and retry + // Hint: use `setNextAndMatchIndex` to decrease nextIndex + // Log: logger.Info("append entries failed, decrease next index", zap.Uint64("nextIndex", nextIndex), zap.Uint64("matchIndex", matchIndex)) } else if len(entries) != 0 { - // if successful and with log entries, update `matchIndex` and `nextIndex` for the follower - matchIndex := entries[len(entries)-1].GetId() - nextIndex := matchIndex + 1 - r.setNextAndMatchIndex(peerId, nextIndex, matchIndex) - - logger.Info("append entries successfully, set next index and match index", - zap.Uint64("nextIndex", nextIndex), - zap.Uint64("matchIndex", matchIndex)) + // TODO: (B.8) - if successful: update nextIndex and matchIndex for follower + // Hint: use `setNextAndMatchIndex` to update nextIndex and matchIndex + // Log: logger.Info("append entries successfully, set next index and match index", zap.Uint32("peer", result.peerId), zap.Uint64("nextIndex", nextIndex), zap.Uint64("matchIndex", matchIndex)) } replicasNeeded := (len(r.peers)+1)/2 + 1 logs := r.getLogs(r.commitIndex + 1) for i := len(logs) - 1; i >= 0; i-- { - log := logs[i] - if log.GetTerm() != r.currentTerm { - continue - } + // TODO: (B.9) if there exiss an N such that N > commitIndex, a majority of matchIndex[i] >= N, and log[N].term == currentTerm: set commitIndex = N + // Hint: find if such N exists + // Hint: if such N exists, use `setCommitIndex` to set commit index + // Hint: if such N exists, use `applyLogs` to apply logs replicas := 1 - for peerId := range r.peers { - if r.matchIndex[peerId] >= log.GetId() { - replicas++ - } - } if replicas >= replicasNeeded { - r.setCommitIndex(log.GetId()) - r.logger.Info("found new logs committed, apply new logs", zap.Uint64("commitIndex", r.commitIndex)) - - go r.applyLogs(r.applyCh) - - break } } }