Skip to content

Commit

Permalink
Merge pull request #7 from gokul656/dev
Browse files Browse the repository at this point in the history
moved from flag to yaml for configs
  • Loading branch information
gokul656 authored Feb 17, 2024
2 parents 88225ae + 5f5a032 commit e79e798
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 57 deletions.
11 changes: 3 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@

BINARY_NAME=peer
LEADER_RPC_PORT=51500

run-leader:
go run cmd/* --api-port 3001 --grpc-port ${LEADER_RPC_PORT} --name peer-0
go run cmd/* --config node_configs/leader.yml

run-follower-1:
go run cmd/* --api-port 3001 --grpc-port 51501 --leader localhost:${LEADER_RPC_PORT} --name peer-1
go run cmd/* --config node_configs/follower-1.yml

run-follower-2:
go run cmd/* --api-port 3001 --grpc-port 51502 --leader localhost:${LEADER_RPC_PORT} --name peer-2

run-leader-and-follower:
make run-leader &1
make run-follower
go run cmd/* --config node_configs/follower-2.yml

gen-proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative protocol/cluster.proto
Expand Down
63 changes: 38 additions & 25 deletions config/env_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,57 @@ package config

import (
"flag"
"log"
"os"

"gopkg.in/yaml.v2"
)

var instaceIDFlag = flag.String("name", "peer-1", "provide peer name")
var leaderIDFlag = flag.String("leader-name", "peer-0", "provide leader name")
var rpcPortFlag = flag.String("grpc-port", "51500", "provide port for gRPC connection")
var apiPortFlag = flag.String("api-port", "3000", "provide port for API connection")
var knownLeader = flag.String("leader", "", "provide leader address")
var logDirFlag = flag.String("raft-dir", "./tmp/raft-logs", "provide directory to be used for storing logs")

type EnvConfig struct {
InstanceID string
APIPort string
RPCPort string
LogDir string
Leader string
LeaderID string
var configFile = flag.String("config", "", "path for config.yml")

type Config struct {
Server ServerConfig `yaml:"server"`
}

type PeerConfig struct {
FallbackUrl string `yaml:"fallback_peer_url"`
Timeout uint64 `yaml:"health_check_timeout"`
Hosts []string `yaml:"hosts"`
}

func (cfg *EnvConfig) LoadConfig() {
type ServerConfig struct {
Peer PeerConfig `yaml:"peer"`
InstanceID string `yaml:"instance_id,omitempty"`
APIPort string `yaml:"api_port,omitempty"`
RPCPort string `yaml:"rpc_port,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Leader string `yaml:"leader,omitempty"`
LeaderID string `yaml:"leader_id,omitempty"`
}

func (cfg *Config) LoadConfig() {
flag.Parse()
if *configFile == "" {
log.Fatalln("unble to locate config.yml file")
}

file, err := os.ReadFile(*configFile)
if err != nil {
log.Fatalln(err.Error())
}

cfg.InstanceID = *instaceIDFlag
cfg.RPCPort = *rpcPortFlag
cfg.APIPort = *apiPortFlag
cfg.LogDir = *logDirFlag
cfg.Leader = *knownLeader
cfg.LeaderID = *leaderIDFlag
yaml.Unmarshal(file, env)
}

var env *EnvConfig
var env *Config

func init() {
env = &EnvConfig{}
env = &Config{}
env.LoadConfig()

setupLogDir() // creates a log dir if not exists
}

func GetEnv() EnvConfig {
return *env
func GetEnv() ServerConfig {
return env.Server
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ require (
google.golang.org/protobuf v1.31.0
)

require gopkg.in/yaml.v2 v2.4.0

require (
github.com/fsnotify/fsnotify v1.6.0 // direct
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.3
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
Expand All @@ -21,3 +18,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
6 changes: 4 additions & 2 deletions internal/raft_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"sync"
"time"

"github.com/gokul656/raft-consensus/common"
"github.com/gokul656/raft-consensus/config"
Expand All @@ -26,7 +27,7 @@ func StartupRaft() {
Cluster = peer.NewRaft(&peer.Peer{
Address: fmt.Sprintf("localhost:%s", env.RPCPort),
Name: env.InstanceID,
State: protocol.PeerState_FOLLOWER.Enum(),
State: *protocol.PeerState_FOLLOWER.Enum(),
})

Cluster.AddPeer(env.InstanceID, fmt.Sprintf("localhost:%s", env.RPCPort))
Expand All @@ -46,7 +47,8 @@ func runAsLeader() {
env := config.GetEnv()
Cluster.ChangeLeader(env.InstanceID)

go Cluster.CheckFollowersHealth()
timeout := time.Duration(env.Peer.Timeout) * time.Second
go Cluster.CheckFollowersHealth(timeout)
}

func runAsFollower() {
Expand Down
15 changes: 15 additions & 0 deletions node_configs/follower-1.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
server:
instance_id: peer-1
api_port: 3001
rpc_port: 51501
log_dir: ./tmp/raft-logs
leader: localhost:51500
leader_id: peer-0
peer:
fallback_peer_url: https://google.com/api/peers.txt
health_check_timeout: 2 # in seconds
hosts:
- 102.123.142.323
- 102.123.142.323
- 102.123.142.323
- 102.123.142.323
15 changes: 15 additions & 0 deletions node_configs/follower-2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
server:
instance_id: peer-2
api_port: 3002
rpc_port: 51502
log_dir: ./tmp/raft-logs
leader: localhost:51500
leader_id: peer-0
peer:
fallback_peer_url: https://google.com/api/peers.txt
health_check_timeout: 2 # in seconds
hosts:
- 102.123.142.323
- 102.123.142.323
- 102.123.142.323
- 102.123.142.323
13 changes: 13 additions & 0 deletions node_configs/leader.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
server:
instance_id: peer-0
api_port: 3000
rpc_port: 51500
log_dir: ./tmp/raft-logs
peer:
fallback_peer_url: https://google.com/api/peers.txt
health_check_timeout: 2 # in seconds
hosts:
- 102.123.142.323
- 102.123.142.323
- 102.123.142.323
- 102.123.142.323
2 changes: 2 additions & 0 deletions peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer
import (
"context"
"log"
"math/rand"
"time"

"github.com/gokul656/raft-consensus/common"
Expand Down Expand Up @@ -51,6 +52,7 @@ func (r *RaftHub) InvokePeerElection(ctx context.Context, address string, messag
return nil, common.ErrPeerUnavailable
}

time.Sleep(time.Duration(rand.Intn(301)+200) * time.Millisecond)
response, err := client.InitiateElection(ctx, message)
if err != nil {
return nil, common.ErrPeerUnavailable
Expand Down
2 changes: 1 addition & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type Peer struct {
Address string
Name string
State *protocol.PeerState `protobuf:"enum=State" json:"State"`
State protocol.PeerState `protobuf:"enum=State" json:"State"`
}

func (p *Peer) CheckIsAlive() bool {
Expand Down
32 changes: 17 additions & 15 deletions peer/raft_hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer
import (
"context"
"log"
"math/rand"
"time"

"github.com/gokul656/raft-consensus/common"
Expand All @@ -21,7 +22,7 @@ func (r *RaftHub) AddPeer(name, address string) {
r.peers.Put(name, &Peer{
Address: address,
Name: name,
State: protocol.PeerState_FOLLOWER.Enum(),
State: *protocol.PeerState_FOLLOWER.Enum(),
})

go r.notifyAll(&protocol.Event{Message: &protocol.Event_PeerAddedEvent{
Expand All @@ -43,9 +44,9 @@ func (r *RaftHub) InitiateElection() {
Term: uint64(r.term),
}

log.Println("[INFO] Initiation election", r.peers)
log.Println("[INFO] Initiating election", r.peers)
for _, peer := range r.peers.GetEntries() {
if peer != r.Self {
if peer != r.Self && peer.State != *protocol.PeerState_DEAD.Enum() {
response, err := r.InvokePeerElection(context.Background(), peer.Address, electionRequest, ElectionTimeout())
if err == nil {
log.Println(response.String())
Expand Down Expand Up @@ -73,7 +74,7 @@ func (r RaftHub) GetPeerList() Map[string, *Peer] {
return r.peers
}

func (r *RaftHub) UpdatePeerStatus(name string, state *protocol.PeerState) {
func (r *RaftHub) UpdatePeerStatus(name string, state protocol.PeerState) {
peer := r.GetPeer(name)
if peer == nil {
log.Println("[INFO] Peer not registered", name)
Expand All @@ -86,7 +87,7 @@ func (r *RaftHub) UpdatePeerStatus(name string, state *protocol.PeerState) {
Message: &protocol.Event_PeerStateChangeEvent{
PeerStateChangeEvent: &protocol.PeerStateChangeEvent{
Name: name,
PeerState: *state,
PeerState: state,
},
},
})
Expand All @@ -101,7 +102,7 @@ func (r *RaftHub) ChangeLeader(name string) error {

// test Leader connection before making as Leader
r.Leader = peer
r.UpdatePeerStatus(name, protocol.PeerState_LEADER.Enum())
r.UpdatePeerStatus(name, *protocol.PeerState_LEADER.Enum())
return nil
}

Expand All @@ -112,22 +113,24 @@ func (r *RaftHub) CheckLeaderHealth() {
for range ticker.C {
if r.Self != r.Leader {
if !r.Leader.CheckIsAlive() {
r.UpdatePeerStatus(r.Leader.Name, protocol.PeerState_DEAD.Enum())
r.UpdatePeerStatus(r.Leader.Name, *protocol.PeerState_DEAD.Enum())
r.InitiateElection()
}
}
}
}

func (r *RaftHub) CheckFollowersHealth() {
ticker := time.NewTicker(common.FollowerHealthCheckDelay)
func (r *RaftHub) CheckFollowersHealth(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()

for range ticker.C {
for _, peer := range r.peers.entry {
if peer.State != protocol.PeerState_LEADER.Enum() {
if !peer.CheckIsAlive() {
r.UpdatePeerStatus(peer.Name, protocol.PeerState_DEAD.Enum())
if peer.State != *protocol.PeerState_LEADER.Enum() {
existingPeer := r.GetPeer(peer.Name)
if !peer.CheckIsAlive() && existingPeer.State != *protocol.PeerState_DEAD.Enum() {
log.Printf("peer dead: %s", peer.Name)
r.UpdatePeerStatus(peer.Name, *protocol.PeerState_DEAD.Enum())
}
}
}
Expand All @@ -147,15 +150,14 @@ func (r *RaftHub) Synchronize() {
r.AddPeer(peer.Name, peer.Address)
}

r.UpdatePeerStatus(peer.Name, protocol.PeerState(protocol.PeerState_value[peer.State]).Enum())
r.UpdatePeerStatus(peer.Name, *protocol.PeerState(protocol.PeerState_value[peer.State]).Enum())
}

log.Println("[INFO] Synchronizing success", r.peers)
}

func ElectionTimeout() time.Duration {
// return time.Duration((time.Duration(rand.Intn(300-500+1)) + 300) * time.Millisecond)
return 3 * time.Second
return time.Duration(rand.Intn(301)+200) * time.Millisecond
}

func NewRaft(self *Peer) *RaftHub {
Expand Down
2 changes: 1 addition & 1 deletion rpc/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *GRPCServer) NotifyAll(ctx context.Context, req *protocol.Event) (*proto
hub.RemovePeer(req.GetPeerRemovedEvent().Name)
case *protocol.Event_PeerStateChangeEvent:
event := req.GetPeerStateChangeEvent()
hub.UpdatePeerStatus(event.Name, event.PeerState.Enum())
hub.UpdatePeerStatus(event.Name, *event.PeerState.Enum())
log.Println("Updating event", event.Name, event.PeerState.Enum())
default:
log.Println("Updating event", req)
Expand Down

0 comments on commit e79e798

Please sign in to comment.