From 8949c75cbc15651e8902ce6c8439f2de92167f54 Mon Sep 17 00:00:00 2001 From: danl5 Date: Sun, 12 May 2024 17:10:39 +0800 Subject: [PATCH 1/2] Adjust the node state structure --- examples/onenode/node.go | 6 +++-- pkg/consensus/consensus.go | 41 ++++++++++++++++++--------------- pkg/consensus/consensus_rpc.go | 10 ++++---- pkg/consensus/consensus_test.go | 26 ++++++++++++++------- pkg/model/command.go | 7 +++++- pkg/model/node.go | 1 - 6 files changed, 56 insertions(+), 35 deletions(-) diff --git a/examples/onenode/node.go b/examples/onenode/node.go index 0f26cb8..6306c1a 100644 --- a/examples/onenode/node.go +++ b/examples/onenode/node.go @@ -115,12 +115,14 @@ func main() { for addr, n := range cs.Nodes { fmt.Println(addr, n.State.String()) } - + fmt.Println() leaderNode, _ := e.Leader() fmt.Println("Leader:", leaderNode) + fmt.Println() isLeader := e.IsLeader() - fmt.Println("Leader:", isLeader) + fmt.Println("IsLeader:", isLeader) + fmt.Println() } } } diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 3d32358..1f34c76 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -108,7 +108,7 @@ func (c *Consensus) IsLeader() bool { // Leader retrieves the current leader node from the cluster. // It returns an error if no leader is found or there's an issue fetching the cluster state. -func (c *Consensus) Leader() (*model.Node, error) { +func (c *Consensus) Leader() (*model.ElectNode, error) { clusterState, err := c.ClusterState() if err != nil { c.logger.Error("failed to get cluster state", "error", err.Error()) @@ -128,7 +128,12 @@ func (c *Consensus) Leader() (*model.Node, error) { func (c *Consensus) ClusterState() (*model.ClusterState, error) { g := errgroup.Group{} clusterState := &model.ClusterState{ - Nodes: map[string]*model.ElectNode{c.node.Address: &c.node}, + Nodes: map[string]*model.NodeWithState{ + c.node.ID: { + Node: c.node, + State: model.NodeState(c.fsm.Current()), + }, + }, } stateMap := sync.Map{} for rpcClient := range c.rpcClients.iterate() { @@ -139,7 +144,7 @@ func (c *Consensus) ClusterState() (*model.ClusterState, error) { } g.Go(func() error { - resp := model.ElectNode{} + resp := model.NodeWithState{} // send state request err := clt.Call("RpcHandler.State", nil, &resp) if err != nil { @@ -147,7 +152,7 @@ func (c *Consensus) ClusterState() (*model.ClusterState, error) { return fmt.Errorf("failed to get node state, peer %s, err: %s", nodeAddr, err.Error()) } - stateMap.Store(resp.Address, &resp) + stateMap.Store(resp.Node.ID, &resp) return nil }) } @@ -158,7 +163,7 @@ func (c *Consensus) ClusterState() (*model.ClusterState, error) { } stateMap.Range(func(key, value any) bool { - clusterState.Nodes[key.(string)] = value.(*model.ElectNode) + clusterState.Nodes[key.(string)] = value.(*model.NodeWithState) return true }) @@ -166,8 +171,11 @@ func (c *Consensus) ClusterState() (*model.ClusterState, error) { } // CurrentState returns the current election node state. -func (c *Consensus) CurrentState() model.ElectNode { - return c.node +func (c *Consensus) CurrentState() model.NodeWithState { + return model.NodeWithState{ + State: model.NodeState(c.fsm.Current()), + Node: c.node, + } } // Visualize returns a visualization of the current consensus state machine in Graphviz format. @@ -175,10 +183,9 @@ func (c *Consensus) Visualize() string { return fsm.Visualize(c.fsm) } -func (c *Consensus) enterLeader(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) enterLeader(_ context.Context, ev *fsm.Event) { c.logger.Info("become leader") c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) - c.node.State = model.NodeStateLeader c.leaderChan = make(chan struct{}, 1) go func() { err := c.runLeader() @@ -211,16 +218,15 @@ func (c *Consensus) runLeader() error { } } -func (c *Consensus) leaveLeader(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) leaveLeader(_ context.Context, ev *fsm.Event) { c.logger.Info("leave leader") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.leaderChan) } -func (c *Consensus) enterFollower(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) enterFollower(_ context.Context, ev *fsm.Event) { c.logger.Info("become follower") c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) - c.node.State = model.NodeStateFollower c.followerChan = make(chan struct{}, 1) go func() { err := c.runFollower() @@ -259,16 +265,15 @@ func (c *Consensus) runFollower() error { } } -func (c *Consensus) leaveFollower(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) leaveFollower(_ context.Context, ev *fsm.Event) { c.logger.Info("leave follower") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.followerChan) } -func (c *Consensus) enterCandidate(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) enterCandidate(_ context.Context, ev *fsm.Event) { c.logger.Info("become candidate") c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) - c.node.State = model.NodeStateCandidate c.candidateChan = make(chan struct{}, 1) go func() { err := c.runCandidate() @@ -357,18 +362,18 @@ func (c *Consensus) tryToBecomeLeader() error { } } -func (c *Consensus) leaveCandidate(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) leaveCandidate(_ context.Context, ev *fsm.Event) { c.logger.Info("leave candidate") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.candidateChan) } -func (c *Consensus) enterShutdown(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) enterShutdown(_ context.Context, ev *fsm.Event) { c.logger.Info("become shutdown") c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter) } -func (c *Consensus) leaveShutdown(ctx context.Context, ev *fsm.Event) { +func (c *Consensus) leaveShutdown(_ context.Context, ev *fsm.Event) { c.logger.Info("leave shutdown") c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave) close(c.shutdownChan) diff --git a/pkg/consensus/consensus_rpc.go b/pkg/consensus/consensus_rpc.go index b47e25b..a093836 100644 --- a/pkg/consensus/consensus_rpc.go +++ b/pkg/consensus/consensus_rpc.go @@ -39,7 +39,7 @@ func (c *RpcHandler) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartB // update term of this node c.setTerm(args.Term) - switch c.node.State { + switch model.NodeState(c.fsm.Current()) { case model.NodeStateLeader: // leave leader state c.sendEvent(model.EventLeaveLeader) @@ -59,13 +59,13 @@ func (c *RpcHandler) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartB // RequestVote handle vote request from peer node func (c *RpcHandler) RequestVote(args *model.RequestVoteRequest, reply *model.RequestVoteResponse) error { c.logger.Info("receive vote request", "from", args.NodeAddr, "term", args.Term, "current term", c.term) - // return when novote is true + // return when no-vote is true if c.node.NoVote { model.VoteResponse(reply, c.node.Node, false, common.VoteNoVoteNode.String()) return nil } - switch c.node.State { + switch model.NodeState(c.fsm.Current()) { case model.NodeStateLeader: if args.Term <= c.term { model.VoteResponse(reply, c.node.Node, false, common.VoteLeaderExist.String()) @@ -98,13 +98,13 @@ func (c *RpcHandler) RequestVote(args *model.RequestVoteRequest, reply *model.Re } // Ping handles ping request from peer node -func (c *RpcHandler) Ping(args struct{}, reply *string) error { +func (c *RpcHandler) Ping(_ struct{}, reply *string) error { *reply = "pong" return nil } // State return current node state -func (c *RpcHandler) State(args struct{}, reply *model.ElectNode) error { +func (c *RpcHandler) State(_ struct{}, reply *model.NodeWithState) error { *reply = c.CurrentState() return nil } diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 519e50a..1be1f24 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -4,6 +4,7 @@ import ( "log/slog" "testing" + "github.com/looplab/fsm" "github.com/stretchr/testify/assert" "github.com/danl5/goelect/pkg/common" @@ -94,7 +95,7 @@ func TestConsensus_RequestVote(t *testing.T) { type fields struct { termCache *termCache logger log.Logger - node model.ElectNode + fsm *fsm.FSM eventChan chan model.NodeEvent } type args struct { @@ -105,6 +106,15 @@ func TestConsensus_RequestVote(t *testing.T) { vote bool message string } + leaderFsm := &fsm.FSM{} + leaderFsm.SetState(model.NodeStateLeader.String()) + + followerFsm := &fsm.FSM{} + followerFsm.SetState(model.NodeStateFollower.String()) + + candidateFsm := &fsm.FSM{} + candidateFsm.SetState(model.NodeStateCandidate.String()) + tests := []struct { name string fields fields @@ -120,7 +130,7 @@ func TestConsensus_RequestVote(t *testing.T) { }, logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), - node: model.ElectNode{State: model.NodeStateLeader}, + fsm: leaderFsm, }, args: args{ args: &model.RequestVoteRequest{ @@ -142,7 +152,7 @@ func TestConsensus_RequestVote(t *testing.T) { }, logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), - node: model.ElectNode{State: model.NodeStateLeader}, + fsm: leaderFsm, }, args: args{ args: &model.RequestVoteRequest{ @@ -164,7 +174,7 @@ func TestConsensus_RequestVote(t *testing.T) { }, logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), - node: model.ElectNode{State: model.NodeStateFollower}, + fsm: followerFsm, }, args: args{ args: &model.RequestVoteRequest{ @@ -186,7 +196,7 @@ func TestConsensus_RequestVote(t *testing.T) { }, logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), - node: model.ElectNode{State: model.NodeStateFollower}, + fsm: followerFsm, }, args: args{ args: &model.RequestVoteRequest{ @@ -208,7 +218,7 @@ func TestConsensus_RequestVote(t *testing.T) { }, logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), - node: model.ElectNode{State: model.NodeStateCandidate}, + fsm: candidateFsm, }, args: args{ args: &model.RequestVoteRequest{ @@ -230,7 +240,7 @@ func TestConsensus_RequestVote(t *testing.T) { }, logger: slog.Default(), eventChan: make(chan model.NodeEvent, 10), - node: model.ElectNode{State: model.NodeStateCandidate}, + fsm: candidateFsm, }, args: args{ args: &model.RequestVoteRequest{ @@ -251,7 +261,7 @@ func TestConsensus_RequestVote(t *testing.T) { Consensus: &Consensus{ termCache: tt.fields.termCache, logger: tt.fields.logger, - node: tt.fields.node, + fsm: tt.fields.fsm, eventChan: tt.fields.eventChan, }, } diff --git a/pkg/model/command.go b/pkg/model/command.go index a767b98..374877b 100644 --- a/pkg/model/command.go +++ b/pkg/model/command.go @@ -37,7 +37,12 @@ func VoteResponse(resp *RequestVoteResponse, node Node, vote bool, msg string) { resp.Message = msg } +type NodeWithState struct { + State NodeState `json:"state"` + Node ElectNode `json:"node"` +} + // ClusterState represents the state of a cluster, including the nodes that make up the cluster. type ClusterState struct { - Nodes map[string]*ElectNode `json:"nodes"` + Nodes map[string]*NodeWithState `json:"nodes"` } diff --git a/pkg/model/node.go b/pkg/model/node.go index 8eb06f0..4e85f27 100644 --- a/pkg/model/node.go +++ b/pkg/model/node.go @@ -43,6 +43,5 @@ func (n *Node) Validate() error { type ElectNode struct { Node - State NodeState NoVote bool } From 938181d6223ecfacb10882184fe3777524555ad7 Mon Sep 17 00:00:00 2001 From: danl5 Date: Sun, 12 May 2024 17:14:28 +0800 Subject: [PATCH 2/2] Adjust the node state structure --- pkg/consensus/consensus_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 1be1f24..bc10632 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -79,6 +79,7 @@ func TestConsensus_HeartBeat(t *testing.T) { termCache: tt.fields.termCache, logger: tt.fields.logger, eventChan: tt.fields.eventChan, + fsm: &fsm.FSM{}, }, } if err := c.HeartBeat(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr {