From 5f5a0322a42d57bdbd247a8d557263f084f3ffaa Mon Sep 17 00:00:00 2001 From: gokul656 Date: Sun, 18 Feb 2024 00:51:48 +0530 Subject: [PATCH] moved from flag to yaml for configs --- Makefile | 11 ++----- config/env_config.go | 63 ++++++++++++++++++++++--------------- go.mod | 5 +-- go.sum | 7 +++-- internal/raft_logic.go | 6 ++-- node_configs/follower-1.yml | 15 +++++++++ node_configs/follower-2.yml | 15 +++++++++ node_configs/leader.yml | 13 ++++++++ peer/client.go | 2 ++ peer/peer.go | 2 +- peer/raft_hub.go | 32 ++++++++++--------- rpc/rpc_handler.go | 2 +- 12 files changed, 116 insertions(+), 57 deletions(-) create mode 100644 node_configs/follower-1.yml create mode 100644 node_configs/follower-2.yml create mode 100644 node_configs/leader.yml diff --git a/Makefile b/Makefile index cb7d6e8..9bbab4a 100644 --- a/Makefile +++ b/Makefile @@ -1,19 +1,14 @@ - BINARY_NAME=peer LEADER_RPC_PORT=51500 run-leader: - go run cmd/* --api-port 3001 --grpc-port ${LEADER_RPC_PORT} --name peer-0 + go run cmd/* --config node_configs/leader.yml run-follower-1: - go run cmd/* --api-port 3001 --grpc-port 51501 --leader localhost:${LEADER_RPC_PORT} --name peer-1 + go run cmd/* --config node_configs/follower-1.yml run-follower-2: - go run cmd/* --api-port 3001 --grpc-port 51502 --leader localhost:${LEADER_RPC_PORT} --name peer-2 - -run-leader-and-follower: - make run-leader &1 - make run-follower + go run cmd/* --config node_configs/follower-2.yml gen-proto: protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative protocol/cluster.proto diff --git a/config/env_config.go b/config/env_config.go index 5c34e84..a4464af 100644 --- a/config/env_config.go +++ b/config/env_config.go @@ -2,44 +2,57 @@ package config import ( "flag" + "log" + "os" + + "gopkg.in/yaml.v2" ) -var instaceIDFlag = flag.String("name", "peer-1", "provide peer name") -var leaderIDFlag = flag.String("leader-name", "peer-0", "provide leader name") -var rpcPortFlag = flag.String("grpc-port", "51500", "provide port for gRPC connection") -var apiPortFlag = flag.String("api-port", "3000", "provide port for API connection") -var knownLeader = flag.String("leader", "", "provide leader address") -var logDirFlag = flag.String("raft-dir", "./tmp/raft-logs", "provide directory to be used for storing logs") - -type EnvConfig struct { - InstanceID string - APIPort string - RPCPort string - LogDir string - Leader string - LeaderID string +var configFile = flag.String("config", "", "path for config.yml") + +type Config struct { + Server ServerConfig `yaml:"server"` +} + +type PeerConfig struct { + FallbackUrl string `yaml:"fallback_peer_url"` + Timeout uint64 `yaml:"health_check_timeout"` + Hosts []string `yaml:"hosts"` } -func (cfg *EnvConfig) LoadConfig() { +type ServerConfig struct { + Peer PeerConfig `yaml:"peer"` + InstanceID string `yaml:"instance_id,omitempty"` + APIPort string `yaml:"api_port,omitempty"` + RPCPort string `yaml:"rpc_port,omitempty"` + LogDir string `yaml:"log_dir,omitempty"` + Leader string `yaml:"leader,omitempty"` + LeaderID string `yaml:"leader_id,omitempty"` +} + +func (cfg *Config) LoadConfig() { flag.Parse() + if *configFile == "" { + log.Fatalln("unble to locate config.yml file") + } + + file, err := os.ReadFile(*configFile) + if err != nil { + log.Fatalln(err.Error()) + } - cfg.InstanceID = *instaceIDFlag - cfg.RPCPort = *rpcPortFlag - cfg.APIPort = *apiPortFlag - cfg.LogDir = *logDirFlag - cfg.Leader = *knownLeader - cfg.LeaderID = *leaderIDFlag + yaml.Unmarshal(file, env) } -var env *EnvConfig +var env *Config func init() { - env = &EnvConfig{} + env = &Config{} env.LoadConfig() setupLogDir() // creates a log dir if not exists } -func GetEnv() EnvConfig { - return *env +func GetEnv() ServerConfig { + return env.Server } diff --git a/go.mod b/go.mod index c94bbca..b99573a 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,10 @@ require ( google.golang.org/protobuf v1.31.0 ) +require gopkg.in/yaml.v2 v2.4.0 + require ( - github.com/fsnotify/fsnotify v1.6.0 // direct - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.3 golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect diff --git a/go.sum b/go.sum index c94537b..6eb31c5 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= @@ -7,7 +5,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= @@ -21,3 +18,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/internal/raft_logic.go b/internal/raft_logic.go index 6b477d7..6474a68 100644 --- a/internal/raft_logic.go +++ b/internal/raft_logic.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "sync" + "time" "github.com/gokul656/raft-consensus/common" "github.com/gokul656/raft-consensus/config" @@ -26,7 +27,7 @@ func StartupRaft() { Cluster = peer.NewRaft(&peer.Peer{ Address: fmt.Sprintf("localhost:%s", env.RPCPort), Name: env.InstanceID, - State: protocol.PeerState_FOLLOWER.Enum(), + State: *protocol.PeerState_FOLLOWER.Enum(), }) Cluster.AddPeer(env.InstanceID, fmt.Sprintf("localhost:%s", env.RPCPort)) @@ -46,7 +47,8 @@ func runAsLeader() { env := config.GetEnv() Cluster.ChangeLeader(env.InstanceID) - go Cluster.CheckFollowersHealth() + timeout := time.Duration(env.Peer.Timeout) * time.Second + go Cluster.CheckFollowersHealth(timeout) } func runAsFollower() { diff --git a/node_configs/follower-1.yml b/node_configs/follower-1.yml new file mode 100644 index 0000000..900b4b8 --- /dev/null +++ b/node_configs/follower-1.yml @@ -0,0 +1,15 @@ +server: + instance_id: peer-1 + api_port: 3001 + rpc_port: 51501 + log_dir: ./tmp/raft-logs + leader: localhost:51500 + leader_id: peer-0 + peer: + fallback_peer_url: https://google.com/api/peers.txt + health_check_timeout: 2 # in seconds + hosts: + - 102.123.142.323 + - 102.123.142.323 + - 102.123.142.323 + - 102.123.142.323 \ No newline at end of file diff --git a/node_configs/follower-2.yml b/node_configs/follower-2.yml new file mode 100644 index 0000000..03abb1a --- /dev/null +++ b/node_configs/follower-2.yml @@ -0,0 +1,15 @@ +server: + instance_id: peer-2 + api_port: 3002 + rpc_port: 51502 + log_dir: ./tmp/raft-logs + leader: localhost:51500 + leader_id: peer-0 + peer: + fallback_peer_url: https://google.com/api/peers.txt + health_check_timeout: 2 # in seconds + hosts: + - 102.123.142.323 + - 102.123.142.323 + - 102.123.142.323 + - 102.123.142.323 \ No newline at end of file diff --git a/node_configs/leader.yml b/node_configs/leader.yml new file mode 100644 index 0000000..7e5b7d9 --- /dev/null +++ b/node_configs/leader.yml @@ -0,0 +1,13 @@ +server: + instance_id: peer-0 + api_port: 3000 + rpc_port: 51500 + log_dir: ./tmp/raft-logs + peer: + fallback_peer_url: https://google.com/api/peers.txt + health_check_timeout: 2 # in seconds + hosts: + - 102.123.142.323 + - 102.123.142.323 + - 102.123.142.323 + - 102.123.142.323 \ No newline at end of file diff --git a/peer/client.go b/peer/client.go index 08efb67..f12e675 100644 --- a/peer/client.go +++ b/peer/client.go @@ -3,6 +3,7 @@ package peer import ( "context" "log" + "math/rand" "time" "github.com/gokul656/raft-consensus/common" @@ -51,6 +52,7 @@ func (r *RaftHub) InvokePeerElection(ctx context.Context, address string, messag return nil, common.ErrPeerUnavailable } + time.Sleep(time.Duration(rand.Intn(301)+200) * time.Millisecond) response, err := client.InitiateElection(ctx, message) if err != nil { return nil, common.ErrPeerUnavailable diff --git a/peer/peer.go b/peer/peer.go index 60ef68c..6c80235 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -11,7 +11,7 @@ import ( type Peer struct { Address string Name string - State *protocol.PeerState `protobuf:"enum=State" json:"State"` + State protocol.PeerState `protobuf:"enum=State" json:"State"` } func (p *Peer) CheckIsAlive() bool { diff --git a/peer/raft_hub.go b/peer/raft_hub.go index 7b304de..5882cec 100644 --- a/peer/raft_hub.go +++ b/peer/raft_hub.go @@ -3,6 +3,7 @@ package peer import ( "context" "log" + "math/rand" "time" "github.com/gokul656/raft-consensus/common" @@ -21,7 +22,7 @@ func (r *RaftHub) AddPeer(name, address string) { r.peers.Put(name, &Peer{ Address: address, Name: name, - State: protocol.PeerState_FOLLOWER.Enum(), + State: *protocol.PeerState_FOLLOWER.Enum(), }) go r.notifyAll(&protocol.Event{Message: &protocol.Event_PeerAddedEvent{ @@ -43,9 +44,9 @@ func (r *RaftHub) InitiateElection() { Term: uint64(r.term), } - log.Println("[INFO] Initiation election", r.peers) + log.Println("[INFO] Initiating election", r.peers) for _, peer := range r.peers.GetEntries() { - if peer != r.Self { + if peer != r.Self && peer.State != *protocol.PeerState_DEAD.Enum() { response, err := r.InvokePeerElection(context.Background(), peer.Address, electionRequest, ElectionTimeout()) if err == nil { log.Println(response.String()) @@ -73,7 +74,7 @@ func (r RaftHub) GetPeerList() Map[string, *Peer] { return r.peers } -func (r *RaftHub) UpdatePeerStatus(name string, state *protocol.PeerState) { +func (r *RaftHub) UpdatePeerStatus(name string, state protocol.PeerState) { peer := r.GetPeer(name) if peer == nil { log.Println("[INFO] Peer not registered", name) @@ -86,7 +87,7 @@ func (r *RaftHub) UpdatePeerStatus(name string, state *protocol.PeerState) { Message: &protocol.Event_PeerStateChangeEvent{ PeerStateChangeEvent: &protocol.PeerStateChangeEvent{ Name: name, - PeerState: *state, + PeerState: state, }, }, }) @@ -101,7 +102,7 @@ func (r *RaftHub) ChangeLeader(name string) error { // test Leader connection before making as Leader r.Leader = peer - r.UpdatePeerStatus(name, protocol.PeerState_LEADER.Enum()) + r.UpdatePeerStatus(name, *protocol.PeerState_LEADER.Enum()) return nil } @@ -112,22 +113,24 @@ func (r *RaftHub) CheckLeaderHealth() { for range ticker.C { if r.Self != r.Leader { if !r.Leader.CheckIsAlive() { - r.UpdatePeerStatus(r.Leader.Name, protocol.PeerState_DEAD.Enum()) + r.UpdatePeerStatus(r.Leader.Name, *protocol.PeerState_DEAD.Enum()) r.InitiateElection() } } } } -func (r *RaftHub) CheckFollowersHealth() { - ticker := time.NewTicker(common.FollowerHealthCheckDelay) +func (r *RaftHub) CheckFollowersHealth(d time.Duration) { + ticker := time.NewTicker(d) defer ticker.Stop() for range ticker.C { for _, peer := range r.peers.entry { - if peer.State != protocol.PeerState_LEADER.Enum() { - if !peer.CheckIsAlive() { - r.UpdatePeerStatus(peer.Name, protocol.PeerState_DEAD.Enum()) + if peer.State != *protocol.PeerState_LEADER.Enum() { + existingPeer := r.GetPeer(peer.Name) + if !peer.CheckIsAlive() && existingPeer.State != *protocol.PeerState_DEAD.Enum() { + log.Printf("peer dead: %s", peer.Name) + r.UpdatePeerStatus(peer.Name, *protocol.PeerState_DEAD.Enum()) } } } @@ -147,15 +150,14 @@ func (r *RaftHub) Synchronize() { r.AddPeer(peer.Name, peer.Address) } - r.UpdatePeerStatus(peer.Name, protocol.PeerState(protocol.PeerState_value[peer.State]).Enum()) + r.UpdatePeerStatus(peer.Name, *protocol.PeerState(protocol.PeerState_value[peer.State]).Enum()) } log.Println("[INFO] Synchronizing success", r.peers) } func ElectionTimeout() time.Duration { - // return time.Duration((time.Duration(rand.Intn(300-500+1)) + 300) * time.Millisecond) - return 3 * time.Second + return time.Duration(rand.Intn(301)+200) * time.Millisecond } func NewRaft(self *Peer) *RaftHub { diff --git a/rpc/rpc_handler.go b/rpc/rpc_handler.go index 4c7cc3a..c7243c2 100644 --- a/rpc/rpc_handler.go +++ b/rpc/rpc_handler.go @@ -78,7 +78,7 @@ func (s *GRPCServer) NotifyAll(ctx context.Context, req *protocol.Event) (*proto hub.RemovePeer(req.GetPeerRemovedEvent().Name) case *protocol.Event_PeerStateChangeEvent: event := req.GetPeerStateChangeEvent() - hub.UpdatePeerStatus(event.Name, event.PeerState.Enum()) + hub.UpdatePeerStatus(event.Name, *event.PeerState.Enum()) log.Println("Updating event", event.Name, event.PeerState.Enum()) default: log.Println("Updating event", req)