Skip to content

Commit

Permalink
implemented synchronization between peers
Browse files Browse the repository at this point in the history
  • Loading branch information
gokul656 committed Oct 10, 2023
1 parent df73ba9 commit eccc246
Show file tree
Hide file tree
Showing 19 changed files with 634 additions and 308 deletions.
23 changes: 20 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
run-peer:
go run cmd/* --api-port 3001

BINARY_NAME=peer
LEADER_RPC_PORT=51500

run-leader:
go run cmd/* --api-port 3001 --grpc-port ${LEADER_RPC_PORT} --name peer-0

run-follower-1:
go run cmd/* --api-port 3001 --grpc-port 51501 --leader localhost:${LEADER_RPC_PORT} --name peer-1

run-follower-2:
go run cmd/* --api-port 3001 --grpc-port 51502 --leader localhost:${LEADER_RPC_PORT} --name peer-2

run-leader-and-follower:
make run-leader &1
make run-follower

gen-proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative protocol/cluster.proto

build:
go build -o bin/peer cmd/*
make gen-proto
GOARCH=amd64 GOOS=darwin go build -pgo=auto -o bin/${BINARY_NAME}-darwin cmd/*
GOARCH=amd64 GOOS=linux go build -pgo=auto -o bin/${BINARY_NAME}-linux cmd/*
GOARCH=amd64 GOOS=windows go build -pgo=auto -o bin/${BINARY_NAME}-windows cmd/*
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ Install the necessary Protobuf tools with the following commands:
The project is a work in progress, and the following tasks are planned for future development:

* Need to implement Raft election logic
* Implement loggers
* Admin cli to interact with peers
* Containerization
30 changes: 0 additions & 30 deletions cmd/bootstrap.go

This file was deleted.

10 changes: 6 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/gokul656/raft-consensus/common"
"github.com/gokul656/raft-consensus/config"
"github.com/gokul656/raft-consensus/internal"
"github.com/gokul656/raft-consensus/protocol"
"github.com/gokul656/raft-consensus/rpc"
"google.golang.org/grpc"
Expand All @@ -19,25 +20,26 @@ func main() {
rpcPort := fmt.Sprintf(":%s", conf.RPCPort)
apiPort := fmt.Sprintf(":%s", conf.APIPort)

go setupRest(apiPort)
go setupRPC(rpcPort)
go setupRest(apiPort)

log.Println("[ rpc ] listening at ", rpcPort)
log.Println("[ api ] listening at ", apiPort)
log.Println("[ log ] files can be found at", conf.TmpDir)
log.Println("[ log ] files can be found at", conf.LogDir)

select {}
}

func setupRPC(port string) {
defer common.HandlePanic()
defer common.HandlePanic("setupRPC")

server := grpc.NewServer()
listen, err := net.Listen("tcp", port)
if err != nil {
panic(common.InvalidRPCPort)
log.Fatalln("bind address already in use")
}

internal.StartupRaft()
protocol.RegisterClusterServer(server, rpc.NewgRPCServer())
reflection.Register(server)

Expand Down
35 changes: 27 additions & 8 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,37 @@ import (
"log"
)

type CustomError struct {
Err error
Msg string
}

var (
InvalidLeader = errors.New("leader not found")
InvalidRPCPort = errors.New("invalid RPC port")
InvalidAPIPort = errors.New("invalid API port")
InvalidLogPath = errors.New("invalid Log path")
ErrInvalidLeader = errors.New("leader not found")
ErrInvalidPeer = errors.New("invalid peer")
ErrInvalidRPCPort = errors.New("invalid RPC port")
ErrInvalidAPIPort = errors.New("invalid API port")
ErrInvalidLogPath = errors.New("invalid Log path")

UnableToUnmarshal = errors.New("unable to convert struct to byte[]")
UnableToMarshal = errors.New("unable to byte[] struct to struct")
ErrPeerUnavailable = errors.New("unable to establish connection with peer")

ErrUnableToUnmarshal = errors.New("unable to convert struct to byte[]")
ErrUnableToMarshal = errors.New("unable to byte[] struct to struct")
)

func HandlePanic() {
func HandlePanic(msg string) {
if recover := recover(); recover != nil {
log.Fatalln("[ERR]", recover)
switch err := recover.(type) {
case CustomError:
switch err.Err {
case ErrPeerUnavailable:
log.Println(msg, "peer unavailable", err.Msg)
default:
log.Println(msg, recover)
}
default:
log.Println(err)
}

}
}
17 changes: 15 additions & 2 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ package common

import (
"encoding/json"
"fmt"
"log"
"time"
)

const (
FollowerHealthCheckDelay = 2 * time.Second
LeaderHealthCheckDelay = 2 * time.Second
)

var startupTime = time.Now().UnixMilli()

func ToByte(data interface{}) ([]byte, error) {
log.SetPrefix("[ERR]")

marhsalled, err := json.Marshal(data)
if err != nil {
return nil, UnableToUnmarshal
return nil, ErrUnableToUnmarshal
}

return marhsalled, nil
Expand All @@ -19,8 +28,12 @@ func ToByte(data interface{}) ([]byte, error) {
func FromByte[T interface{}](data []byte, result T) (T, error) {
err := json.Unmarshal(data, &result)
if err != nil {
return result, UnableToMarshal
return result, ErrUnableToMarshal
}

return result, nil
}

func GetLogfileName(dir string) string {
return fmt.Sprintf("%s/raft-%d.log", dir, startupTime)
}
19 changes: 9 additions & 10 deletions config/env_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,38 @@ package config

import (
"flag"

"github.com/gokul656/raft-consensus/common"
)

var instaceIDFLag = flag.String("name", "peer-0", "provide peer name")
var rpcPortFlag = flag.String("grpc-port", "51505", "provide port for gRPC connection")
var instaceIDFlag = flag.String("name", "peer-1", "provide peer name")
var leaderIDFlag = flag.String("leader-name", "peer-0", "provide leader name")
var rpcPortFlag = flag.String("grpc-port", "51500", "provide port for gRPC connection")
var apiPortFlag = flag.String("api-port", "3000", "provide port for API connection")
var knownLeader = flag.String("leader", "", "provide leader address")
var tmpDirFlag = flag.String("raft-dir", "./tmp/raft-logs", "provide directory to be used for storing logs")
var logDirFlag = flag.String("raft-dir", "./tmp/raft-logs", "provide directory to be used for storing logs")

type EnvConfig struct {
InstanceID string
APIPort string
RPCPort string
TmpDir string
LogDir string
Leader string
LeaderID string
}

func (cfg *EnvConfig) LoadConfig() {
flag.Parse()

cfg.InstanceID = *instaceIDFLag
cfg.InstanceID = *instaceIDFlag
cfg.RPCPort = *rpcPortFlag
cfg.APIPort = *apiPortFlag
cfg.TmpDir = *tmpDirFlag
cfg.LogDir = *logDirFlag
cfg.Leader = *knownLeader
cfg.LeaderID = *leaderIDFlag
}

var env *EnvConfig

func init() {
defer common.HandlePanic()

env = &EnvConfig{}
env.LoadConfig()

Expand Down
19 changes: 18 additions & 1 deletion config/log_config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
package config

import (
"io"
"log"
"os"
)

func setupLogDir() {
dest := GetEnv().TmpDir
dest := GetEnv().LogDir
_, err := os.Stat(dest)
if err != nil {
if err := os.MkdirAll(dest, os.ModePerm); err != nil {
panic(err)
}
}
}

func EnableLogging(filename string) {
logFile, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalln(err)
}

// multi writer for writing logs both to console & file
mw := io.MultiWriter(logFile, os.Stdout)
log.SetOutput(mw)
}

func DisableLoggin() {
log.SetOutput(os.Stdout)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
)

require (
github.com/fsnotify/fsnotify v1.6.0 // direct
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
Expand Down
71 changes: 71 additions & 0 deletions internal/raft_logic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package internal

import (
"context"
"fmt"
"log"
"sync"

"github.com/gokul656/raft-consensus/common"
"github.com/gokul656/raft-consensus/config"
"github.com/gokul656/raft-consensus/peer"
"github.com/gokul656/raft-consensus/protocol"
)

var Cluster *peer.RaftHub
var RPCServer protocol.ClusterServer
var once sync.Once

func StartupRaft() {
log.Println("Setting up cluster...")
once.Do(
func() {
defer common.HandlePanic("raft_state")

env := config.GetEnv()
Cluster = peer.NewRaft(&peer.Peer{
Address: fmt.Sprintf("localhost:%s", env.RPCPort),
Name: env.InstanceID,
State: protocol.PeerState_FOLLOWER.Enum(),
})

Cluster.AddPeer(env.InstanceID, fmt.Sprintf("localhost:%s", env.RPCPort))
Cluster.Self = Cluster.GetPeer(env.InstanceID)

// if there are no leaders, the current peer elects itself as leader & initiates election
if env.Leader == "" {
runAsLeader()
} else {
runAsFollower()
}
},
)
}

func runAsLeader() {
env := config.GetEnv()
Cluster.ChangeLeader(env.InstanceID)

go Cluster.CheckFollowersHealth()
}

func runAsFollower() {
env := config.GetEnv()

Cluster.AddPeer(env.LeaderID, env.Leader)
Cluster.ChangeLeader(env.LeaderID)

err := Cluster.Register(context.Background(), &protocol.AddPeerRequest{
Peer: &protocol.Peer{
Name: Cluster.Self.Name,
Address: Cluster.Self.Address,
},
})

if err != nil {
log.Fatalln("[CRITICAL] Unable to register as Follower", err)
}

Cluster.Synchronize()
go Cluster.CheckLeaderHealth()
}
Loading

0 comments on commit eccc246

Please sign in to comment.