Skip to content

Commit

Permalink
Merge pull request #22 from danl5/refine_node_state
Browse files Browse the repository at this point in the history
Adjust the node state structure
  • Loading branch information
danl5 authored May 12, 2024
2 parents c17d991 + 938181d commit 4554270
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 35 deletions.
6 changes: 4 additions & 2 deletions examples/onenode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ func main() {
for addr, n := range cs.Nodes {
fmt.Println(addr, n.State.String())
}

fmt.Println()
leaderNode, _ := e.Leader()
fmt.Println("Leader:", leaderNode)

fmt.Println()
isLeader := e.IsLeader()
fmt.Println("Leader:", isLeader)
fmt.Println("IsLeader:", isLeader)
fmt.Println()
}
}
}
41 changes: 23 additions & 18 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *Consensus) IsLeader() bool {

// Leader retrieves the current leader node from the cluster.
// It returns an error if no leader is found or there's an issue fetching the cluster state.
func (c *Consensus) Leader() (*model.Node, error) {
func (c *Consensus) Leader() (*model.ElectNode, error) {
clusterState, err := c.ClusterState()
if err != nil {
c.logger.Error("failed to get cluster state", "error", err.Error())
Expand All @@ -128,7 +128,12 @@ func (c *Consensus) Leader() (*model.Node, error) {
func (c *Consensus) ClusterState() (*model.ClusterState, error) {
g := errgroup.Group{}
clusterState := &model.ClusterState{
Nodes: map[string]*model.ElectNode{c.node.Address: &c.node},
Nodes: map[string]*model.NodeWithState{
c.node.ID: {
Node: c.node,
State: model.NodeState(c.fsm.Current()),
},
},
}
stateMap := sync.Map{}
for rpcClient := range c.rpcClients.iterate() {
Expand All @@ -139,15 +144,15 @@ func (c *Consensus) ClusterState() (*model.ClusterState, error) {
}

g.Go(func() error {
resp := model.ElectNode{}
resp := model.NodeWithState{}
// send state request
err := clt.Call("RpcHandler.State", nil, &resp)
if err != nil {
c.logger.Error("failed to get node state", "peer", nodeAddr)
return fmt.Errorf("failed to get node state, peer %s, err: %s", nodeAddr, err.Error())
}

stateMap.Store(resp.Address, &resp)
stateMap.Store(resp.Node.ID, &resp)
return nil
})
}
Expand All @@ -158,27 +163,29 @@ func (c *Consensus) ClusterState() (*model.ClusterState, error) {
}

stateMap.Range(func(key, value any) bool {
clusterState.Nodes[key.(string)] = value.(*model.ElectNode)
clusterState.Nodes[key.(string)] = value.(*model.NodeWithState)
return true
})

return clusterState, err
}

// CurrentState returns the current election node state.
func (c *Consensus) CurrentState() model.ElectNode {
return c.node
func (c *Consensus) CurrentState() model.NodeWithState {
return model.NodeWithState{
State: model.NodeState(c.fsm.Current()),
Node: c.node,
}
}

// Visualize returns a visualization of the current consensus state machine in Graphviz format.
func (c *Consensus) Visualize() string {
return fsm.Visualize(c.fsm)
}

func (c *Consensus) enterLeader(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) enterLeader(_ context.Context, ev *fsm.Event) {
c.logger.Info("become leader")
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
c.node.State = model.NodeStateLeader
c.leaderChan = make(chan struct{}, 1)
go func() {
err := c.runLeader()
Expand Down Expand Up @@ -211,16 +218,15 @@ func (c *Consensus) runLeader() error {
}
}

func (c *Consensus) leaveLeader(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) leaveLeader(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave leader")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.leaderChan)
}

func (c *Consensus) enterFollower(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) enterFollower(_ context.Context, ev *fsm.Event) {
c.logger.Info("become follower")
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
c.node.State = model.NodeStateFollower
c.followerChan = make(chan struct{}, 1)
go func() {
err := c.runFollower()
Expand Down Expand Up @@ -259,16 +265,15 @@ func (c *Consensus) runFollower() error {
}
}

func (c *Consensus) leaveFollower(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) leaveFollower(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave follower")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.followerChan)
}

func (c *Consensus) enterCandidate(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) enterCandidate(_ context.Context, ev *fsm.Event) {
c.logger.Info("become candidate")
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
c.node.State = model.NodeStateCandidate
c.candidateChan = make(chan struct{}, 1)
go func() {
err := c.runCandidate()
Expand Down Expand Up @@ -357,18 +362,18 @@ func (c *Consensus) tryToBecomeLeader() error {
}
}

func (c *Consensus) leaveCandidate(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) leaveCandidate(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave candidate")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.candidateChan)
}

func (c *Consensus) enterShutdown(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) enterShutdown(_ context.Context, ev *fsm.Event) {
c.logger.Info("become shutdown")
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
}

func (c *Consensus) leaveShutdown(ctx context.Context, ev *fsm.Event) {
func (c *Consensus) leaveShutdown(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave shutdown")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.shutdownChan)
Expand Down
10 changes: 5 additions & 5 deletions pkg/consensus/consensus_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *RpcHandler) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartB
// update term of this node
c.setTerm(args.Term)

switch c.node.State {
switch model.NodeState(c.fsm.Current()) {
case model.NodeStateLeader:
// leave leader state
c.sendEvent(model.EventLeaveLeader)
Expand All @@ -59,13 +59,13 @@ func (c *RpcHandler) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartB
// RequestVote handle vote request from peer node
func (c *RpcHandler) RequestVote(args *model.RequestVoteRequest, reply *model.RequestVoteResponse) error {
c.logger.Info("receive vote request", "from", args.NodeAddr, "term", args.Term, "current term", c.term)
// return when novote is true
// return when no-vote is true
if c.node.NoVote {
model.VoteResponse(reply, c.node.Node, false, common.VoteNoVoteNode.String())
return nil
}

switch c.node.State {
switch model.NodeState(c.fsm.Current()) {
case model.NodeStateLeader:
if args.Term <= c.term {
model.VoteResponse(reply, c.node.Node, false, common.VoteLeaderExist.String())
Expand Down Expand Up @@ -98,13 +98,13 @@ func (c *RpcHandler) RequestVote(args *model.RequestVoteRequest, reply *model.Re
}

// Ping handles ping request from peer node
func (c *RpcHandler) Ping(args struct{}, reply *string) error {
func (c *RpcHandler) Ping(_ struct{}, reply *string) error {
*reply = "pong"
return nil
}

// State return current node state
func (c *RpcHandler) State(args struct{}, reply *model.ElectNode) error {
func (c *RpcHandler) State(_ struct{}, reply *model.NodeWithState) error {
*reply = c.CurrentState()
return nil
}
27 changes: 19 additions & 8 deletions pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log/slog"
"testing"

"github.com/looplab/fsm"
"github.com/stretchr/testify/assert"

"github.com/danl5/goelect/pkg/common"
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestConsensus_HeartBeat(t *testing.T) {
termCache: tt.fields.termCache,
logger: tt.fields.logger,
eventChan: tt.fields.eventChan,
fsm: &fsm.FSM{},
},
}
if err := c.HeartBeat(tt.args.args, tt.args.reply); (err != nil) != tt.wantErr {
Expand All @@ -94,7 +96,7 @@ func TestConsensus_RequestVote(t *testing.T) {
type fields struct {
termCache *termCache
logger log.Logger
node model.ElectNode
fsm *fsm.FSM
eventChan chan model.NodeEvent
}
type args struct {
Expand All @@ -105,6 +107,15 @@ func TestConsensus_RequestVote(t *testing.T) {
vote bool
message string
}
leaderFsm := &fsm.FSM{}
leaderFsm.SetState(model.NodeStateLeader.String())

followerFsm := &fsm.FSM{}
followerFsm.SetState(model.NodeStateFollower.String())

candidateFsm := &fsm.FSM{}
candidateFsm.SetState(model.NodeStateCandidate.String())

tests := []struct {
name string
fields fields
Expand All @@ -120,7 +131,7 @@ func TestConsensus_RequestVote(t *testing.T) {
},
logger: slog.Default(),
eventChan: make(chan model.NodeEvent, 10),
node: model.ElectNode{State: model.NodeStateLeader},
fsm: leaderFsm,
},
args: args{
args: &model.RequestVoteRequest{
Expand All @@ -142,7 +153,7 @@ func TestConsensus_RequestVote(t *testing.T) {
},
logger: slog.Default(),
eventChan: make(chan model.NodeEvent, 10),
node: model.ElectNode{State: model.NodeStateLeader},
fsm: leaderFsm,
},
args: args{
args: &model.RequestVoteRequest{
Expand All @@ -164,7 +175,7 @@ func TestConsensus_RequestVote(t *testing.T) {
},
logger: slog.Default(),
eventChan: make(chan model.NodeEvent, 10),
node: model.ElectNode{State: model.NodeStateFollower},
fsm: followerFsm,
},
args: args{
args: &model.RequestVoteRequest{
Expand All @@ -186,7 +197,7 @@ func TestConsensus_RequestVote(t *testing.T) {
},
logger: slog.Default(),
eventChan: make(chan model.NodeEvent, 10),
node: model.ElectNode{State: model.NodeStateFollower},
fsm: followerFsm,
},
args: args{
args: &model.RequestVoteRequest{
Expand All @@ -208,7 +219,7 @@ func TestConsensus_RequestVote(t *testing.T) {
},
logger: slog.Default(),
eventChan: make(chan model.NodeEvent, 10),
node: model.ElectNode{State: model.NodeStateCandidate},
fsm: candidateFsm,
},
args: args{
args: &model.RequestVoteRequest{
Expand All @@ -230,7 +241,7 @@ func TestConsensus_RequestVote(t *testing.T) {
},
logger: slog.Default(),
eventChan: make(chan model.NodeEvent, 10),
node: model.ElectNode{State: model.NodeStateCandidate},
fsm: candidateFsm,
},
args: args{
args: &model.RequestVoteRequest{
Expand All @@ -251,7 +262,7 @@ func TestConsensus_RequestVote(t *testing.T) {
Consensus: &Consensus{
termCache: tt.fields.termCache,
logger: tt.fields.logger,
node: tt.fields.node,
fsm: tt.fields.fsm,
eventChan: tt.fields.eventChan,
},
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/model/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ func VoteResponse(resp *RequestVoteResponse, node Node, vote bool, msg string) {
resp.Message = msg
}

type NodeWithState struct {
State NodeState `json:"state"`
Node ElectNode `json:"node"`
}

// ClusterState represents the state of a cluster, including the nodes that make up the cluster.
type ClusterState struct {
Nodes map[string]*ElectNode `json:"nodes"`
Nodes map[string]*NodeWithState `json:"nodes"`
}
1 change: 0 additions & 1 deletion pkg/model/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ func (n *Node) Validate() error {
type ElectNode struct {
Node

State NodeState
NoVote bool
}

0 comments on commit 4554270

Please sign in to comment.