From 9a2bc1e2a4fad9a662107a5ab4530a1f96daa466 Mon Sep 17 00:00:00 2001 From: danl5 Date: Sat, 13 Jul 2024 13:30:47 +0800 Subject: [PATCH] Resolve the issue of failing to correctly elect a leader. --- examples/onenode/node.go | 4 +- examples/onenode/node.sh | 48 ++++++++++ pkg/consensus/consensus.go | 152 ++++++++++++++++++++++++-------- pkg/consensus/consensus_test.go | 61 ++++++++----- pkg/transport/rpc/rpc.go | 2 +- 5 files changed, 208 insertions(+), 59 deletions(-) create mode 100755 examples/onenode/node.sh diff --git a/examples/onenode/node.go b/examples/onenode/node.go index 4e31723..a9f770c 100644 --- a/examples/onenode/node.go +++ b/examples/onenode/node.go @@ -43,7 +43,9 @@ func newElect() (*goelect.Elect, error) { peerNodes = append(peerNodes, goelect.Node{Address: pa, ID: pa}) } - logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) // rpc transport rpcTransport, err := rpc.NewRPC(logger) diff --git a/examples/onenode/node.sh b/examples/onenode/node.sh new file mode 100755 index 0000000..57df545 --- /dev/null +++ b/examples/onenode/node.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +node_addrs=("127.0.0.1:9981" "127.0.0.1:9982" "127.0.0.1:9983") +peers="127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983" + +elect_binary="node" +pid_file="./node_pids" + +start_nodes() { + echo "Starting nodes..." + for node_addr in "${node_addrs[@]}"; do + echo "Starting node with address $node_addr" + log_file="${node_addr//:/_}.log" + ./$elect_binary --nodeaddr=$node_addr --peers=$peers 2>&1 | tee "$log_file" & + echo $! >> $pid_file + done + echo "All nodes started." +} + +stop_nodes() { + echo "Stopping nodes..." + if [ -f $pid_file ]; then + while read -r pid; do + echo "Stopping process $pid" + kill -9 $pid + done < $pid_file + rm $pid_file + echo "All nodes stopped." + else + echo "No nodes are running." + fi +} + +show_help() { + echo "Usage: $0 {start|stop}" +} + +case "$1" in + start) + start_nodes + ;; + stop) + stop_nodes + ;; + *) + show_help + ;; +esac \ No newline at end of file diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 4816f61..7f57e81 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -79,6 +79,19 @@ type Consensus struct { candidateChan chan struct{} // shutdownChan is used to send shutdown event shutdownChan chan struct{} + + // preEventState holds the node state before an event is processed. + // This allows for comparison and analysis of state changes after the event handling. + preEventState model.NodeState + + // inLeaderState indicates whether the current node is in the leader state. + inLeaderState bool + // inFollowerState indicates whether the current node is in the follower state. + inFollowerState bool + // inCandidateState indicates whether the current node is in the candidate state. + inCandidateState bool + // inDownState indicates whether the current node is in a down state. + inDownState bool } // Run starts the consensus @@ -157,6 +170,7 @@ func (c *Consensus) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartBe c.logger.Debug("receive heartbeat", "from", args.NodeId) if c.term > args.Term { + c.logger.Info("peer term is behind self", "peer term", args.Term, "self term", c.term) // term in the request is behind this node model.HBResponse(reply, false, common.HeartbeatExpired.String()) return nil @@ -165,17 +179,17 @@ func (c *Consensus) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartBe // update term of this node c.setTerm(args.Term) - switch model.NodeState(c.fsm.Current()) { - case model.NodeStateLeader: + switch { + case c.ensureState(model.NodeStateLeader): // leave leader state - c.sendEvent(model.EventLeaveLeader) - case model.NodeStateFollower: + c.sendEvent(model.NodeStateLeader, model.EventLeaveLeader) + case c.ensureState(model.NodeStateFollower): // send heartbeat to handler c.followerChan <- struct{}{} - case model.NodeStateCandidate: + case c.ensureState(model.NodeStateCandidate): // receive a new leader - c.sendEvent(model.EventNewLeader) - case model.NodeStateDown: + c.sendEvent(model.NodeStateCandidate, model.EventNewLeader) + case c.ensureState(model.NodeStateDown): } model.HBResponse(reply, true, common.HeartbeatOk.String()) @@ -191,27 +205,27 @@ func (c *Consensus) RequestVote(args *model.RequestVoteRequest, reply *model.Req return nil } - switch model.NodeState(c.fsm.Current()) { - case model.NodeStateLeader: + switch { + case c.ensureState(model.NodeStateLeader): if args.Term <= c.term { model.VoteResponse(reply, c.node.Node, false, common.VoteLeaderExist.String()) return nil } // term in the request is newer, leaves leader state - c.sendEvent(model.EventLeaveLeader) - case model.NodeStateFollower: + c.sendEvent(model.NodeStateLeader, model.EventLeaveLeader) + case c.ensureState(model.NodeStateFollower): if args.Term < c.term { model.VoteResponse(reply, c.node.Node, false, common.VoteTermExpired.String()) return nil } - case model.NodeStateCandidate: + case c.ensureState(model.NodeStateCandidate): if args.Term <= c.term { model.VoteResponse(reply, c.node.Node, false, common.VoteHaveVoted.String()) return nil } // term in the request is newer, vote and switch to follower state - c.sendEvent(model.EventNewTerm) - case model.NodeStateDown: + c.sendEvent(model.NodeStateCandidate, model.EventNewTerm) + case c.ensureState(model.NodeStateDown): } // update term cache @@ -358,9 +372,43 @@ func (c *Consensus) buildHeaders() model.Header { return model.Header{Node: c.node.Node} } +func (c *Consensus) ensureState(state model.NodeState) bool { + if model.NodeState(c.fsm.Current()) != state { + return false + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + c.logger.Error("wait for state ready timeout", "state", state) + return false + default: + } + stateReady := false + switch state { + case model.NodeStateLeader: + stateReady = c.inLeaderState + case model.NodeStateFollower: + stateReady = c.inFollowerState + case model.NodeStateCandidate: + stateReady = c.inCandidateState + case model.NodeStateDown: + stateReady = c.inDownState + } + + if stateReady { + break + } + time.Sleep(500 * time.Microsecond) + } + + return true +} + func (c *Consensus) enterLeader(ctx context.Context, ev *fsm.Event) { c.logger.Info("become leader") - c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) c.leaderChan = make(chan struct{}, 1) go func() { err := c.runLeader(ctx) @@ -369,6 +417,8 @@ func (c *Consensus) enterLeader(ctx context.Context, ev *fsm.Event) { return } }() + c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) + c.inLeaderState = true } func (c *Consensus) runLeader(_ context.Context) error { @@ -376,12 +426,19 @@ func (c *Consensus) runLeader(_ context.Context) error { defer tk.Stop() for { + select { + case <-c.leaderChan: + c.logger.Info("leave leader") + return nil + default: + } + var errCount int // send heartbeat to followers c.sendHeartBeat(&errCount) // leaves leader state if the number of errors is more than half if errCount >= c.countVoteNode()/2+1 { - c.sendEvent(model.EventLeaveLeader) + c.sendEvent(model.NodeStateLeader, model.EventLeaveLeader) } select { @@ -397,11 +454,11 @@ func (c *Consensus) leaveLeader(_ context.Context, ev *fsm.Event) { c.logger.Info("leave leader") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.leaderChan) + c.inLeaderState = false } func (c *Consensus) enterFollower(ctx context.Context, ev *fsm.Event) { c.logger.Info("become follower") - c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) c.followerChan = make(chan struct{}, 1) go func() { err := c.runFollower(ctx) @@ -410,6 +467,8 @@ func (c *Consensus) enterFollower(ctx context.Context, ev *fsm.Event) { return } }() + c.inFollowerState = true + c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) } func (c *Consensus) runFollower(_ context.Context) error { @@ -436,7 +495,7 @@ func (c *Consensus) runFollower(_ context.Context) error { case <-ts.C: c.logger.Info("leave follower state due to heartbeat timeout") // heartbeat timeout - c.sendEvent(model.EventHeartbeatTimeout) + c.sendEvent(model.NodeStateFollower, model.EventHeartbeatTimeout) return nil } } @@ -446,11 +505,11 @@ func (c *Consensus) leaveFollower(_ context.Context, ev *fsm.Event) { c.logger.Info("leave follower") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.followerChan) + c.inFollowerState = false } func (c *Consensus) enterCandidate(ctx context.Context, ev *fsm.Event) { c.logger.Info("become candidate") - c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) c.candidateChan = make(chan struct{}, 1) go func() { err := c.runCandidate(ctx) @@ -459,6 +518,8 @@ func (c *Consensus) enterCandidate(ctx context.Context, ev *fsm.Event) { return } }() + c.inCandidateState = true + c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) } func (c *Consensus) runCandidate(ctx context.Context) error { @@ -507,7 +568,7 @@ func (c *Consensus) tryToBecomeLeader(_ context.Context) error { // become a leader when receive more than half of the votes if voteCount >= c.countVoteNode()/2+1 { c.logger.Info("received more than half of the votes, try to become leader") - c.sendEvent(model.EventMajorityVotes) + c.sendEvent(model.NodeStateCandidate, model.EventMajorityVotes) return nil } select { @@ -520,7 +581,7 @@ func (c *Consensus) tryToBecomeLeader(_ context.Context) error { voteCount += 1 if voteCount >= c.countVoteNode()/2+1 { c.logger.Info("received more than half of the votes, become leader") - c.sendEvent(model.EventMajorityVotes) + c.sendEvent(model.NodeStateCandidate, model.EventMajorityVotes) return nil } case <-c.candidateChan: @@ -543,10 +604,12 @@ func (c *Consensus) leaveCandidate(_ context.Context, ev *fsm.Event) { c.logger.Info("leave candidate") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.candidateChan) + c.inCandidateState = false } func (c *Consensus) enterShutdown(_ context.Context, ev *fsm.Event) { c.logger.Info("become shutdown") + c.inDownState = true c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) } @@ -554,30 +617,41 @@ func (c *Consensus) leaveShutdown(_ context.Context, ev *fsm.Event) { c.logger.Info("leave shutdown") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.shutdownChan) + c.inDownState = false } -func (c *Consensus) sendEvent(ev model.NodeEvent) { +func (c *Consensus) sendEvent(currentState model.NodeState, ev model.NodeEvent) { + if currentState == c.preEventState { + c.logger.Warn("event occurring simultaneously under the same state,ignore it", + "state", currentState, "event", ev) + return + } + c.preEventState = currentState c.eventChan <- ev c.logger.Debug("node event", "event", ev.String()) } func (c *Consensus) runEventHandler() { + + handler := func(ev model.NodeEvent) { + // check if the event is legal + ok := c.fsm.Can(ev.String()) + if !ok { + c.logger.Error("wrong event", "current state", c.fsm.Current(), "event", ev.String()) + // faulty state migration is unacceptable + panic("unrecoverable error: wrong state transition") + } + + err := c.fsm.Event(context.TODO(), ev.String()) + if err != nil { + c.logger.Error("error state transition", "current state", c.fsm.Current(), "event", ev.String()) + // faulty state migration is unacceptable + panic("unrecoverable error: wrong state transition") + } + } go func() { for ev := range c.eventChan { - // check if the event is legal - ok := c.fsm.Can(ev.String()) - if !ok { - c.logger.Error("wrong event", "current state", c.fsm.Current(), "event", ev.String()) - // faulty state migration is unacceptable - panic("unrecoverable error: wrong state transition") - } - - err := c.fsm.Event(context.TODO(), ev.String()) - if err != nil { - c.logger.Error("error state transition", "current state", c.fsm.Current(), "event", ev.String()) - // faulty state migration is unacceptable - panic("unrecoverable error: wrong state transition") - } + handler(ev) } }() } @@ -638,6 +712,10 @@ func (c *Consensus) sendRequestVote(voteChan chan model.Node) error { peerID := peer.ID g.Go(func() error { + if !c.ensureState(model.NodeStateCandidate) { + return nil + } + c.logger.Info("send vote request to peer", "peer", peerID) resp := &model.Response{} // send vote request @@ -645,7 +723,7 @@ func (c *Consensus) sendRequestVote(voteChan chan model.Node) error { Header: c.buildHeaders(), CommandCode: model.RequestVote, Command: model.RequestVoteRequest{ - NodeId: peerID, + NodeId: c.node.ID, Term: c.term, NodeAddr: c.node.Address, }, diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 6bca0ed..9166f13 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -74,11 +74,15 @@ func TestConsensus_HeartBeat(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &Consensus{ - termCache: tt.fields.termCache, - logger: tt.fields.logger, - eventChan: tt.fields.eventChan, - fsm: &fsm.FSM{}, + termCache: tt.fields.termCache, + logger: tt.fields.logger, + eventChan: tt.fields.eventChan, + fsm: &fsm.FSM{}, + followerChan: make(chan struct{}, 10), + inFollowerState: true, } + c.initializeFsm() + c.fsm.SetState(model.NodeStateFollower.String()) if err := c.HeartBeat(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr { t.Errorf("HeartBeat() error = %v, wantErr %v", err, tt.wantErr) } @@ -91,10 +95,13 @@ func TestConsensus_HeartBeat(t *testing.T) { func TestConsensus_RequestVote(t *testing.T) { type fields struct { - termCache *termCache - logger *slog.Logger - fsm *fsm.FSM - eventChan chan model.NodeEvent + termCache *termCache + logger *slog.Logger + fsm *fsm.FSM + eventChan chan model.NodeEvent + isLeader bool + isFollower bool + isCandidate bool } type args struct { args *model.RequestVoteRequest @@ -129,6 +136,7 @@ func TestConsensus_RequestVote(t *testing.T) { logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), fsm: leaderFsm, + isLeader: true, }, args: args{ args: &model.RequestVoteRequest{ @@ -151,6 +159,7 @@ func TestConsensus_RequestVote(t *testing.T) { logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), fsm: leaderFsm, + isLeader: true, }, args: args{ args: &model.RequestVoteRequest{ @@ -170,9 +179,10 @@ func TestConsensus_RequestVote(t *testing.T) { termCache: &termCache{ term: 1, }, - logger: slog.Default(), - eventChan: make(chan model.NodeEvent, 10), - fsm: followerFsm, + logger: slog.Default(), + eventChan: make(chan model.NodeEvent, 10), + fsm: followerFsm, + isFollower: true, }, args: args{ args: &model.RequestVoteRequest{ @@ -192,9 +202,10 @@ func TestConsensus_RequestVote(t *testing.T) { termCache: &termCache{ term: 2, }, - logger: slog.Default(), - eventChan: make(chan model.NodeEvent, 10), - fsm: followerFsm, + logger: slog.Default(), + eventChan: make(chan model.NodeEvent, 10), + fsm: followerFsm, + isFollower: true, }, args: args{ args: &model.RequestVoteRequest{ @@ -214,9 +225,10 @@ func TestConsensus_RequestVote(t *testing.T) { termCache: &termCache{ term: 1, }, - logger: slog.Default(), - eventChan: make(chan model.NodeEvent, 10), - fsm: candidateFsm, + logger: slog.Default(), + eventChan: make(chan model.NodeEvent, 10), + fsm: candidateFsm, + isCandidate: true, }, args: args{ args: &model.RequestVoteRequest{ @@ -236,9 +248,10 @@ func TestConsensus_RequestVote(t *testing.T) { termCache: &termCache{ term: 2, }, - logger: slog.Default(), - eventChan: make(chan model.NodeEvent, 10), - fsm: candidateFsm, + logger: slog.Default(), + eventChan: make(chan model.NodeEvent, 10), + fsm: candidateFsm, + isCandidate: true, }, args: args{ args: &model.RequestVoteRequest{ @@ -261,6 +274,14 @@ func TestConsensus_RequestVote(t *testing.T) { fsm: tt.fields.fsm, eventChan: tt.fields.eventChan, } + switch { + case tt.fields.isLeader: + c.inLeaderState = true + case tt.fields.isFollower: + c.inFollowerState = true + case tt.fields.isCandidate: + c.inCandidateState = true + } if err := c.RequestVote(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr { t.Errorf("RequestVote() error = %v, wantErr %v", err, tt.wantErr) } diff --git a/pkg/transport/rpc/rpc.go b/pkg/transport/rpc/rpc.go index ad3cb0b..ce4254a 100644 --- a/pkg/transport/rpc/rpc.go +++ b/pkg/transport/rpc/rpc.go @@ -257,7 +257,7 @@ func (c *Client) SendRequest(nodeId string, request *model.Request, response *mo } }() - c.logger.Debug("send rpc request", "command", request.CommandCode, "to", nodeId) + c.logger.Debug("send rpc request", "command", request.CommandCode.String(), "to", nodeId) return nil }