Skip to content

Commit

Permalink
add docker deploy scripts
Browse files Browse the repository at this point in the history
LLiuJJ committed Sep 21, 2024
1 parent 193a4b3 commit f75dd43
Showing 16 changed files with 393 additions and 364 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -29,11 +29,11 @@ default: meta_cli shard_server shard_cli meta_server
image:
docker build -f Dockerfile --network=host -t $(BUILDER_IMAGE) .

build_dev:
build-dev:
chmod +x scripts/build_dev.sh
docker run -it --rm -v $(realpath .):/eraft eraft/eraftbook:$(IMAGE_VERSION) /eraft/scripts/build_dev.sh

run_test:
run-test:
chmod +x scripts/run_tests.sh
docker run --name test-cli-node --network mytestnetwork --ip 172.18.0.5 -it --rm -v $(realpath .):/eraft eraft/eraftbook:$(IMAGE_VERSION) /eraft/scripts/run_tests.sh

27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -117,6 +117,33 @@ eraft 中使用了 hash 分片的方法,我们将数据通过哈希算法映

它主要负责集群数据存储,一般有三台机器组成一个 raft 组,对外提供高可用的服务。

### 在容器里面运行

构建镜像
```
make image
```

编译代码
```
make build-dev
```

运行 demo 集群
```
make run-demo
```

运行读写测试
```
make run-test
```

停止集群
```
make stop-demo
```

### 项目构建

构建依赖
20 changes: 10 additions & 10 deletions cmd/metacli/metacli.go
Original file line number Diff line number Diff line change
@@ -49,15 +49,15 @@ func main() {
sigs := make(chan os.Signal, 1)

addrs := strings.Split(os.Args[1], ",")
meta_cli := metaserver.MakeMetaSvrClient(common.UN_UNSED_TID, addrs)
metaCli := metaserver.MakeMetaSvrClient(common.UN_UNSED_TID, addrs)

sig_chan := make(chan os.Signal, 1)
signal.Notify(sig_chan)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan)

go func() {
sig := <-sigs
fmt.Println(sig)
for _, cli := range meta_cli.GetRpcClis() {
for _, cli := range metaCli.GetRpcClis() {
cli.CloseAllConn()
}
os.Exit(-1)
@@ -69,18 +69,18 @@ func main() {
gid, _ := strconv.Atoi(os.Args[3])
addr_map := make(map[int64]string)
addr_map[int64(gid)] = os.Args[4]
meta_cli.Join(addr_map)
metaCli.Join(addr_map)
}
case "leave":
{
gid, _ := strconv.Atoi(os.Args[3])
meta_cli.Leave([]int64{int64(gid)})
metaCli.Leave([]int64{int64(gid)})
}
case "query":
{
last_conf := meta_cli.Query(-1)
out_bytes, _ := json.Marshal(last_conf)
raftcore.PrintDebugLog("latest configuration: " + string(out_bytes))
lastConf := metaCli.Query(-1)
outBytes, _ := json.Marshal(lastConf)
raftcore.PrintDebugLog("latest configuration: " + string(outBytes))
}
case "move":
{
@@ -94,7 +94,7 @@ func main() {
gid, _ := strconv.Atoi(os.Args[4])

for i := start; i <= end; i++ {
meta_cli.Move(i, gid)
metaCli.Move(i, gid)
}
}
}
22 changes: 11 additions & 11 deletions cmd/metasvr/metasvr.go
Original file line number Diff line number Diff line change
@@ -50,26 +50,26 @@ func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

node_id_str := os.Args[1]
node_id, err := strconv.Atoi(node_id_str)
nodeIdStr := os.Args[1]
nodeID, err := strconv.Atoi(nodeIdStr)
if err != nil {
panic(err)
}
meta_svr_addrs := strings.Split(os.Args[2], ",")
cf_peer_map := make(map[int]string)
for i, addr := range meta_svr_addrs {
cf_peer_map[i] = addr
metaSvrAddrs := strings.Split(os.Args[2], ",")
cfPeerMap := make(map[int]string)
for i, addr := range metaSvrAddrs {
cfPeerMap[i] = addr
}

meta_svr := metaserver.MakeMetaServer(cf_peer_map, node_id)
lis, err := net.Listen("tcp", cf_peer_map[node_id])
metaSvr := metaserver.MakeMetaServer(cfPeerMap, nodeID)
lis, err := net.Listen("tcp", cfPeerMap[nodeID])
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer()

pb.RegisterRaftServiceServer(s, meta_svr)
pb.RegisterRaftServiceServer(s, metaSvr)

sigChan := make(chan os.Signal, 1)

@@ -78,8 +78,8 @@ func main() {
go func() {
sig := <-sigs
fmt.Println(sig)
meta_svr.Rf.CloseEndsConn()
meta_svr.StopApply()
metaSvr.Rf.CloseEndsConn()
metaSvr.StopApply()
os.Exit(-1)
}()

32 changes: 16 additions & 16 deletions cmd/shardcli/shardcli.go
Original file line number Diff line number Diff line change
@@ -47,19 +47,19 @@ func main() {
}
sigs := make(chan os.Signal, 1)

shard_kvcli := shardkvserver.MakeKvClient(os.Args[1])
shardKvCli := shardkvserver.MakeKvClient(os.Args[1])

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan)

switch os.Args[2] {
case "put":
if err := shard_kvcli.Put(os.Args[3], os.Args[4]); err != nil {
if err := shardKvCli.Put(os.Args[3], os.Args[4]); err != nil {
fmt.Println("err: " + err.Error())
return
}
case "get":
v, err := shard_kvcli.Get(os.Args[3])
v, err := shardKvCli.Get(os.Args[3])
if err != nil {
fmt.Println("err: " + err.Error())
return
@@ -69,40 +69,40 @@ func main() {
gid, _ := strconv.Atoi(os.Args[3])
bidsStr := os.Args[4]
bids := []int64{}
bids_strarr := strings.Split(bidsStr, ",")
for _, bidStr := range bids_strarr {
bidsStrArr := strings.Split(bidsStr, ",")
for _, bidStr := range bidsStrArr {
bid, _ := strconv.Atoi(bidStr)
bids = append(bids, int64(bid))
}
datas := shard_kvcli.GetBucketDatas(gid, bids)
datas := shardKvCli.GetBucketDatas(gid, bids)
fmt.Println("get buckets datas: " + datas)
case "delbuckets":
gid, _ := strconv.Atoi(os.Args[3])
bidsStr := os.Args[4]
bids := []int64{}
bids_strarr := strings.Split(bidsStr, ",")
for _, bidStr := range bids_strarr {
bidsStrArr := strings.Split(bidsStr, ",")
for _, bidStr := range bidsStrArr {
bid, _ := strconv.Atoi(bidStr)
bids = append(bids, int64(bid))
}
shard_kvcli.DeleteBucketDatas(gid, bids)
shardKvCli.DeleteBucketDatas(gid, bids)
case "insertbucketkv":
gid, _ := strconv.Atoi(os.Args[3])
bid, _ := strconv.Atoi(os.Args[4])
bucket_datas := &shardkvserver.BucketDatasVo{}
bucket_datas.Datas = make(map[int]map[string]string)
bucketDatas := &shardkvserver.BucketDatasVo{}
bucketDatas.Datas = make(map[int]map[string]string)
kv := map[string]string{os.Args[5]: os.Args[6]}
bucket_datas.Datas[bid] = kv
datas, _ := json.Marshal(bucket_datas)
shard_kvcli.InsertBucketDatas(gid, []int64{int64(bid)}, datas)
bucketDatas.Datas[bid] = kv
datas, _ := json.Marshal(bucketDatas)
shardKvCli.InsertBucketDatas(gid, []int64{int64(bid)}, datas)
}
go func() {
sig := <-sigs
fmt.Println(sig)
for _, cli := range shard_kvcli.GetCsClient().GetRpcClis() {
for _, cli := range shardKvCli.GetCsClient().GetRpcClis() {
cli.CloseAllConn()
}
shard_kvcli.GetRpcClient().CloseAllConn()
shardKvCli.GetRpcClient().CloseAllConn()
os.Exit(-1)
}()
}
30 changes: 14 additions & 16 deletions cmd/shardsvr/shardsvr.go
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@ import (

"github.com/eraft-io/eraft/shardkvserver"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

func main() {
@@ -49,33 +48,33 @@ func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

node_id_str := os.Args[1]
node_id, err := strconv.Atoi(node_id_str)
nodeIdStr := os.Args[1]
nodeID, err := strconv.Atoi(nodeIdStr)
if err != nil {
panic(err)
}

gid_str := os.Args[2]
gid, err := strconv.Atoi(gid_str)
gidStr := os.Args[2]
gid, err := strconv.Atoi(gidStr)
if err != nil {
panic(err)
}

svr_addrs := strings.Split(os.Args[4], ",")
svr_peer_map := make(map[int]string)
for i, addr := range svr_addrs {
svr_peer_map[i] = addr
svrAddrs := strings.Split(os.Args[4], ",")
svrPeerMap := make(map[int]string)
for i, addr := range svrAddrs {
svrPeerMap[i] = addr
}

shard_svr := shardkvserver.MakeShardKVServer(svr_peer_map, int64(node_id), gid, os.Args[3])
lis, err := net.Listen("tcp", svr_peer_map[node_id])
shardSvr := shardkvserver.MakeShardKVServer(svrPeerMap, int64(nodeID), gid, os.Args[3])
lis, err := net.Listen("tcp", svrPeerMap[nodeID])
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
fmt.Printf("server listen on: %s \n", svr_peer_map[node_id])
fmt.Printf("server listen on: %s \n", svrPeerMap[nodeID])
s := grpc.NewServer()
pb.RegisterRaftServiceServer(s, shard_svr)
pb.RegisterRaftServiceServer(s, shardSvr)

sigChan := make(chan os.Signal, 1)

@@ -84,12 +83,11 @@ func main() {
go func() {
sig := <-sigs
fmt.Println(sig)
shard_svr.GetRf().CloseEndsConn()
shard_svr.CloseApply()
shardSvr.GetRf().CloseEndsConn()
shardSvr.CloseApply()
os.Exit(-1)
}()

reflection.Register(s)
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
6 changes: 3 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
@@ -47,11 +47,11 @@ func Key2BucketID(key string) int {
}

func CRC32KeyHash(k string, base int) int {
bucket_id := 0
bucketID := 0
crc32q := crc32.MakeTable(0xD5828281)
sum := crc32.Checksum([]byte(k), crc32q)
bucket_id = int(sum) % NBuckets
return bucket_id
bucketID = int(sum) % NBuckets
return bucketID
}

func Int64ArrToIntArr(in []int64) []int {
85 changes: 42 additions & 43 deletions metaserver/client.go
Original file line number Diff line number Diff line change
@@ -40,9 +40,9 @@ import (

type MetaSvrCli struct {
endpoints []*raftcore.RaftPeerNode
leaderId int64
clientId int64
commandId int64
leaderID int64
clientID int64
commandID int64
}

func nrand() int64 {
@@ -51,32 +51,31 @@ func nrand() int64 {
return bigx.Int64()
}

func MakeMetaSvrClient(targetId uint64, targetAddrs []string) *MetaSvrCli {

mate_svr_cli := &MetaSvrCli{
leaderId: 0,
clientId: nrand(),
commandId: 0,
func MakeMetaSvrClient(targetID uint64, targetAddrs []string) *MetaSvrCli {
mateSvrCli := &MetaSvrCli{
leaderID: 0,
clientID: nrand(),
commandID: 0,
}

for _, addr := range targetAddrs {
cli := raftcore.MakeRaftPeerNode(addr, targetId)
mate_svr_cli.endpoints = append(mate_svr_cli.endpoints, cli)
cli := raftcore.MakeRaftPeerNode(addr, targetID)
mateSvrCli.endpoints = append(mateSvrCli.endpoints, cli)
}

return mate_svr_cli
return mateSvrCli
}

func (meta_svr_cli *MetaSvrCli) GetRpcClis() []*raftcore.RaftPeerNode {
return meta_svr_cli.endpoints
func (metaSvrCli *MetaSvrCli) GetRpcClis() []*raftcore.RaftPeerNode {
return metaSvrCli.endpoints
}

func (meta_svr_cli *MetaSvrCli) Query(ver int64) *Config {
conf_req := &pb.ConfigRequest{
func (metaSvrCli *MetaSvrCli) Query(ver int64) *Config {
confReq := &pb.ConfigRequest{
OpType: pb.ConfigOpType_OpQuery,
ConfigVersion: ver,
}
resp := meta_svr_cli.CallDoConfigRpc(conf_req)
resp := metaSvrCli.CallDoConfigRpc(confReq)
cf := &Config{}
if resp != nil {
cf.Version = int(resp.Config.ConfigVersion)
@@ -92,60 +91,60 @@ func (meta_svr_cli *MetaSvrCli) Query(ver int64) *Config {
return cf
}

func (meta_svr_cli *MetaSvrCli) Move(bucket_id, gid int) *pb.ConfigResponse {
conf_req := &pb.ConfigRequest{
func (metaSvrCli *MetaSvrCli) Move(bucketID, gid int) *pb.ConfigResponse {
confReq := &pb.ConfigRequest{
OpType: pb.ConfigOpType_OpMove,
BucketId: int64(bucket_id),
BucketId: int64(bucketID),
Gid: int64(gid),
}
return meta_svr_cli.CallDoConfigRpc(conf_req)
return metaSvrCli.CallDoConfigRpc(confReq)
}

func (meta_svr_cli *MetaSvrCli) Join(servers map[int64]string) *pb.ConfigResponse {
conf_req := &pb.ConfigRequest{
func (metaSvrCli *MetaSvrCli) Join(servers map[int64]string) *pb.ConfigResponse {
confReq := &pb.ConfigRequest{
OpType: pb.ConfigOpType_OpJoin,
Servers: servers,
}
return meta_svr_cli.CallDoConfigRpc(conf_req)
return metaSvrCli.CallDoConfigRpc(confReq)
}

func (meta_svr_cli *MetaSvrCli) Leave(gids []int64) *pb.ConfigResponse {
conf_req := &pb.ConfigRequest{
func (metaSvrCli *MetaSvrCli) Leave(gids []int64) *pb.ConfigResponse {
confReq := &pb.ConfigRequest{
OpType: pb.ConfigOpType_OpLeave,
Gids: gids,
}
return meta_svr_cli.CallDoConfigRpc(conf_req)
return metaSvrCli.CallDoConfigRpc(confReq)
}

func (meta_svr_cli *MetaSvrCli) CallDoConfigRpc(req *pb.ConfigRequest) *pb.ConfigResponse {
func (metaSvrCli *MetaSvrCli) CallDoConfigRpc(req *pb.ConfigRequest) *pb.ConfigResponse {
var err error
conf_resp := &pb.ConfigResponse{}
conf_resp.Config = &pb.ServerConfig{}
for _, end := range meta_svr_cli.endpoints {
conf_resp, err = (*end.GetRaftServiceCli()).DoConfig(context.Background(), req)
confResp := &pb.ConfigResponse{}
confResp.Config = &pb.ServerConfig{}
for _, end := range metaSvrCli.endpoints {
confResp, err = (*end.GetRaftServiceCli()).DoConfig(context.Background(), req)
if err != nil {
continue
}
switch conf_resp.ErrCode {
switch confResp.ErrCode {
case common.ErrCodeNoErr:
meta_svr_cli.commandId++
return conf_resp
metaSvrCli.commandID++
return confResp
case common.ErrCodeWrongLeader:
conf_resp, err := (*meta_svr_cli.endpoints[conf_resp.LeaderId].GetRaftServiceCli()).DoConfig(context.Background(), req)
confResp, err := (*metaSvrCli.endpoints[confResp.LeaderId].GetRaftServiceCli()).DoConfig(context.Background(), req)
if err != nil {
logger.ELogger().Sugar().Debugf("a node in cluster is down : ", err.Error())
continue
}
if conf_resp.ErrCode == common.ErrCodeNoErr {
meta_svr_cli.commandId++
return conf_resp
if confResp.ErrCode == common.ErrCodeNoErr {
metaSvrCli.commandID++
return confResp
}
if conf_resp.ErrCode == common.ErrCodeExecTimeout {
if confResp.ErrCode == common.ErrCodeExecTimeout {
logger.ELogger().Sugar().Debug("exec timeout")
return conf_resp
return confResp
}
return conf_resp
return confResp
}
}
return conf_resp
return confResp
}
8 changes: 4 additions & 4 deletions metaserver/meta.go
Original file line number Diff line number Diff line change
@@ -54,11 +54,11 @@ func (cf *Config) GetGroup2Buckets() map[int][]int {
const ExecTimeout = 3 * time.Second

func deepCopy(groups map[int][]string) map[int][]string {
new_group := make(map[int][]string)
for gid, severs := range groups {
newGroup := make(map[int][]string)
for gID, severs := range groups {
newSvrs := make([]string, len(severs))
copy(newSvrs, severs)
new_group[gid] = newSvrs
newGroup[gID] = newSvrs
}
return new_group
return newGroup
}
50 changes: 25 additions & 25 deletions metaserver/metaserver.go
Original file line number Diff line number Diff line change
@@ -55,29 +55,29 @@ type MetaServer struct {
}

func MakeMetaServer(peerMaps map[int]string, nodeId int) *MetaServer {
client_ends := []*raftcore.RaftPeerNode{}
clientEnds := []*raftcore.RaftPeerNode{}
for id, addr := range peerMaps {
new_end := raftcore.MakeRaftPeerNode(addr, uint64(id))
client_ends = append(client_ends, new_end)
newEnd := raftcore.MakeRaftPeerNode(addr, uint64(id))
clientEnds = append(clientEnds, newEnd)
}
newApplyCh := make(chan *pb.ApplyMsg)

newdb_eng := storage.EngineFactory("leveldb", "./data/db/metanode_"+strconv.Itoa(nodeId))
logdb_eng := storage.EngineFactory("leveldb", "./data/log/metanode_"+strconv.Itoa(nodeId))
newdbEng := storage.EngineFactory("leveldb", "./data/db/metanode_"+strconv.Itoa(nodeId))
logdbEng := storage.EngineFactory("leveldb", "./data/log/metanode_"+strconv.Itoa(nodeId))

newRf := raftcore.MakeRaft(client_ends, int64(nodeId), logdb_eng, newApplyCh, 50, 150)
meta_server := &MetaServer{
newRf := raftcore.MakeRaft(clientEnds, int64(nodeId), logdbEng, newApplyCh, 50, 150)
metaServer := &MetaServer{
Rf: newRf,
applyCh: newApplyCh,
dead: 0,
stm: NewMemConfigStm(newdb_eng),
stm: NewMemConfigStm(newdbEng),
notifyChans: make(map[int]chan *pb.ConfigResponse),
}

meta_server.stopApplyCh = make(chan interface{})
metaServer.stopApplyCh = make(chan interface{})

go meta_server.ApplingToStm(meta_server.stopApplyCh)
return meta_server
go metaServer.ApplingToStm(metaServer.stopApplyCh)
return metaServer
}

func (s *MetaServer) StopApply() {
@@ -94,20 +94,20 @@ func (s *MetaServer) getNotifyChan(index int) chan *pb.ConfigResponse {
func (s *MetaServer) DoConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.ConfigResponse, error) {
logger.ELogger().Sugar().Debugf("DoConfig %s", req.String())

cmd_resp := &pb.ConfigResponse{}
cmdResp := &pb.ConfigResponse{}

req_bytes, err := json.Marshal(req)
if err != nil {
cmd_resp.ErrMsg = err.Error()
return cmd_resp, err
cmdResp.ErrMsg = err.Error()
return cmdResp, err
}

index, _, isLeader := s.Rf.Propose(req_bytes)
if !isLeader {
cmd_resp.ErrMsg = "is not leader"
cmd_resp.ErrCode = common.ErrCodeWrongLeader
cmd_resp.LeaderId = s.Rf.GetLeaderId()
return cmd_resp, nil
cmdResp.ErrMsg = "is not leader"
cmdResp.ErrCode = common.ErrCodeWrongLeader
cmdResp.LeaderId = s.Rf.GetLeaderId()
return cmdResp, nil
}

s.mu.Lock()
@@ -116,13 +116,13 @@ func (s *MetaServer) DoConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.C

select {
case res := <-ch:
cmd_resp.Config = res.Config
cmd_resp.ErrMsg = res.ErrMsg
cmd_resp.ErrCode = common.ErrCodeNoErr
cmdResp.Config = res.Config
cmdResp.ErrMsg = res.ErrMsg
cmdResp.ErrCode = common.ErrCodeNoErr
case <-time.After(ExecTimeout):
cmd_resp.ErrMsg = "server exec timeout"
cmd_resp.ErrCode = common.ErrCodeExecTimeout
return cmd_resp, errors.New("ExecTimeout")
cmdResp.ErrMsg = "server exec timeout"
cmdResp.ErrCode = common.ErrCodeExecTimeout
return cmdResp, errors.New("ExecTimeout")
}

go func() {
@@ -131,7 +131,7 @@ func (s *MetaServer) DoConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.C
s.mu.Unlock()
}()

return cmd_resp, nil
return cmdResp, nil
}

func (s *MetaServer) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
12 changes: 6 additions & 6 deletions metaserver/metaserver_test.go
Original file line number Diff line number Diff line change
@@ -33,22 +33,22 @@ import (
)

func TestRangeArr(t *testing.T) {
var new_buckets [common.NBuckets]int
new_buckets[0] = 2
for k, v := range new_buckets {
var newBuckets [common.NBuckets]int
newBuckets[0] = 2
for k, v := range newBuckets {
t.Logf("k -> %d, v -> %d", k, v)
}
}

func TestAddGroups(t *testing.T) {
new_db_eng, err := storage_eng.MakeLevelDBKvStore("./conf_data/" + "/test")
newDbEng, err := storage_eng.MakeLevelDBKvStore("./conf_data/" + "/test")
if err != nil {
raftcore.PrintDebugLog("boot storage engine err!")
panic(err)
}
mem_conf_stm := NewMemConfigStm(new_db_eng)
memConfStm := NewMemConfigStm(newDbEng)
for i := 0; i < 1000; i++ {
conf, _ := mem_conf_stm.Query(-1)
conf, _ := memConfStm.Query(-1)
t.Logf("%v %d", conf, i)
}
}
78 changes: 39 additions & 39 deletions metaserver/metastm.go
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ const CUR_VERSION_KEY = "CUR_CONF_VERSION"
type ConfigStm interface {
Join(groups map[int][]string) error
Leave(gids []int) error
Move(bucket_id, gid int) error
Move(bucketID, gID int) error
Query(num int) (Config, error)
}

@@ -52,7 +52,7 @@ type MemConfigStm struct {
func NewMemConfigStm(dbEng storage.KvStore) *MemConfigStm {
// check if has default conf
_, err := dbEng.Get(CF_PREFIX + strconv.Itoa(0))
conf_stm := &MemConfigStm{dbEng: dbEng, curConfVersion: 0}
confStm := &MemConfigStm{dbEng: dbEng, curConfVersion: 0}
if err != nil {
defaultConfig := DefaultConfig()
defaultConfigBytes, err := json.Marshal(defaultConfig)
@@ -61,105 +61,105 @@ func NewMemConfigStm(dbEng storage.KvStore) *MemConfigStm {
}
// init conf
logger.ELogger().Sugar().Debugf("init conf -> " + string(defaultConfigBytes))
if err := conf_stm.dbEng.Put(CF_PREFIX+strconv.Itoa(0), string(defaultConfigBytes)); err != nil {
if err := confStm.dbEng.Put(CF_PREFIX+strconv.Itoa(0), string(defaultConfigBytes)); err != nil {
panic(err)
}
if err := conf_stm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(conf_stm.curConfVersion)); err != nil {
if err := confStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(confStm.curConfVersion)); err != nil {
panic(err)
}
return conf_stm
return confStm
}
version_str, err := dbEng.Get(CUR_VERSION_KEY)
if err != nil {
panic(err)
}
version_int, _ := strconv.Atoi(version_str)
conf_stm.curConfVersion = version_int
return conf_stm
confStm.curConfVersion = version_int
return confStm
}

func (cfStm *MemConfigStm) Join(groups map[int][]string) error {
conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion))
confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion))
if err != nil {
return err
}
last_conf := &Config{}
json.Unmarshal([]byte(conf_bytes), last_conf)
new_config := Config{cfStm.curConfVersion + 1, last_conf.Buckets, deepCopy(last_conf.Groups)}
lastConf := &Config{}
json.Unmarshal([]byte(confBytes), lastConf)
newConfig := Config{cfStm.curConfVersion + 1, lastConf.Buckets, deepCopy(lastConf.Groups)}
for gid, servers := range groups {
if _, ok := new_config.Groups[gid]; !ok {
if _, ok := newConfig.Groups[gid]; !ok {
newSvrs := make([]string, len(servers))
copy(newSvrs, servers)
new_config.Groups[gid] = newSvrs
newConfig.Groups[gid] = newSvrs
}
}
s2g := new_config.GetGroup2Buckets()
s2g := newConfig.GetGroup2Buckets()
var new_buckets [common.NBuckets]int
for gid, buckets := range s2g {
for _, bid := range buckets {
new_buckets[bid] = gid
}
}
new_config.Buckets = new_buckets
new_config_bytes, _ := json.Marshal(new_config)
newConfig.Buckets = new_buckets
newConfigBytes, _ := json.Marshal(newConfig)
cfStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(cfStm.curConfVersion+1))
cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(new_config_bytes))
cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(newConfigBytes))
cfStm.curConfVersion += 1
return nil
}

func (cfStm *MemConfigStm) Leave(gids []int) error {
conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion))
confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion))
if err != nil {
return err
}
last_conf := &Config{}
json.Unmarshal([]byte(conf_bytes), last_conf)
new_conf := Config{cfStm.curConfVersion + 1, last_conf.Buckets, deepCopy(last_conf.Groups)}
lastConf := &Config{}
json.Unmarshal([]byte(confBytes), lastConf)
newConf := Config{cfStm.curConfVersion + 1, lastConf.Buckets, deepCopy(lastConf.Groups)}
for _, gid := range gids {
delete(new_conf.Groups, gid)
delete(newConf.Groups, gid)
}
var newBuckets [common.NBuckets]int
new_conf.Buckets = newBuckets
new_config_bytes, _ := json.Marshal(new_conf)
newConf.Buckets = newBuckets
newConfigBytes, _ := json.Marshal(newConf)
cfStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(cfStm.curConfVersion+1))
cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(new_config_bytes))
cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(newConfigBytes))
cfStm.curConfVersion += 1
return nil
}

func (cfStm *MemConfigStm) Move(bid, gid int) error {
conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion))
confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion))
if err != nil {
return err
}
last_conf := &Config{}
json.Unmarshal([]byte(conf_bytes), last_conf)
new_conf := Config{cfStm.curConfVersion + 1, last_conf.Buckets, deepCopy(last_conf.Groups)}
new_conf.Buckets[bid] = gid
new_config_bytes, _ := json.Marshal(new_conf)
lastConf := &Config{}
json.Unmarshal([]byte(confBytes), lastConf)
newConf := Config{cfStm.curConfVersion + 1, lastConf.Buckets, deepCopy(lastConf.Groups)}
newConf.Buckets[bid] = gid
newConfigBytes, _ := json.Marshal(newConf)
cfStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(cfStm.curConfVersion+1))
cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(new_config_bytes))
cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(newConfigBytes))
cfStm.curConfVersion += 1
return nil
}

func (cfStm *MemConfigStm) Query(version int) (Config, error) {
if version < 0 || version >= cfStm.curConfVersion {
last_conf := &Config{}
lastConf := &Config{}
logger.ELogger().Sugar().Debugf("query cur version -> " + strconv.Itoa(cfStm.curConfVersion))
confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion))
if err != nil {
return DefaultConfig(), err
}
json.Unmarshal([]byte(confBytes), last_conf)
return *last_conf, nil
json.Unmarshal([]byte(confBytes), lastConf)
return *lastConf, nil
}
conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(version))
confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(version))
if err != nil {
return DefaultConfig(), err
}
spec_conf := &Config{}
json.Unmarshal([]byte(conf_bytes), spec_conf)
return *spec_conf, nil
specConf := &Config{}
json.Unmarshal([]byte(confBytes), specConf)
return *specConf, nil
}
99 changes: 52 additions & 47 deletions raftcore/raft.go
Original file line number Diff line number Diff line change
@@ -106,10 +106,10 @@ func MakeRaft(peers []*RaftPeerNode, me int64, newdbEng storage_eng.KvStore, app
}
rf.curTerm, rf.votedFor = rf.logs.ReadRaftState()
rf.applyCond = sync.NewCond(&rf.mu)
last_log := rf.logs.GetLast()
lastLog := rf.logs.GetLast()
for _, peer := range peers {
logger.ELogger().Sugar().Debugf("peer addr %s id %d", peer.addr, peer.id)
rf.matchIdx[peer.id], rf.nextIdx[peer.id] = 0, int(last_log.Index+1)
rf.matchIdx[peer.id], rf.nextIdx[peer.id] = 0, int(lastLog.Index+1)
if int64(peer.id) != me {
rf.replicatorCond[peer.id] = sync.NewCond(&sync.Mutex{})
go rf.Replicator(peer)
@@ -188,9 +188,9 @@ func (rf *Raft) HandleRequestVote(req *pb.RequestVoteRequest, resp *pb.RequestVo
rf.curTerm, rf.votedFor = req.Term, -1
}

last_log := rf.logs.GetLast()
lastLog := rf.logs.GetLast()

if req.LastLogTerm < int64(last_log.Term) || (req.LastLogTerm == int64(last_log.Term) && req.LastLogIndex < last_log.Index) {
if req.LastLogTerm < int64(lastLog.Term) || (req.LastLogTerm == int64(lastLog.Term) && req.LastLogIndex < lastLog.Index) {
resp.Term, resp.VoteGranted = rf.curTerm, false
return
}
@@ -240,11 +240,11 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen
if !rf.MatchLog(req.PrevLogTerm, req.PrevLogIndex) {
resp.Term = rf.curTerm
resp.Success = false
last_index := rf.logs.GetLast().Index
if last_index < req.PrevLogIndex {
logger.ELogger().Sugar().Warnf("log confict with term %d, index %d", -1, last_index+1)
lastIndex := rf.logs.GetLast().Index
if lastIndex < req.PrevLogIndex {
logger.ELogger().Sugar().Warnf("log confict with term %d, index %d", -1, lastIndex+1)
resp.ConflictTerm = -1
resp.ConflictIndex = last_index + 1
resp.ConflictIndex = lastIndex + 1
} else {
first_index := rf.logs.GetFirst().Index
resp.ConflictTerm = int64(rf.logs.GetEntry(req.PrevLogIndex).Term)
@@ -257,10 +257,10 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen
return
}

first_index := rf.logs.GetFirst().Index
firstIndex := rf.logs.GetFirst().Index
for index, entry := range req.Entries {
if int(entry.Index-first_index) >= rf.logs.LogItemCount() || rf.logs.GetEntry(entry.Index).Term != entry.Term {
rf.logs.EraseAfter(entry.Index-first_index, true)
if int(entry.Index-firstIndex) >= rf.logs.LogItemCount() || rf.logs.GetEntry(entry.Index).Term != entry.Term {
rf.logs.EraseAfter(entry.Index-firstIndex, true)
for _, newEnt := range req.Entries[index:] {
rf.logs.Append(newEnt)
}
@@ -517,72 +517,77 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) {
rf.mu.RUnlock()
return
}
prev_log_index := uint64(rf.nextIdx[peer.id] - 1)
logger.ELogger().Sugar().Debugf("leader prev log index %d", prev_log_index)
if prev_log_index < uint64(rf.logs.GetFirst().Index) {
first_log := rf.logs.GetFirst()
prevLogIndex := uint64(rf.nextIdx[peer.id] - 1)
logger.ELogger().Sugar().Debugf("leader prev log index %d", prevLogIndex)
if prevLogIndex < uint64(rf.logs.GetFirst().Index) {
firstLog := rf.logs.GetFirst()

// snapShotContext, err := rf.ReadSnapshot()
// if err != nil {
// logger.ELogger().Sugar().Errorf("read snapshot data error %v", err)
// }

// TODO: send kv leveldb snapshot
snap_shot_req := &pb.InstallSnapshotRequest{
snapShotReq := &pb.InstallSnapshotRequest{
Term: rf.curTerm,
LeaderId: int64(rf.id),
LastIncludedIndex: first_log.Index,
LastIncludedTerm: int64(first_log.Term),
LastIncludedIndex: firstLog.Index,
LastIncludedTerm: int64(firstLog.Term),
// Data: snapShotContext,
}

rf.mu.RUnlock()

logger.ELogger().Sugar().Debugf("send snapshot to %s with %s", peer.addr, snap_shot_req.String())
logger.ELogger().Sugar().Debugf("send snapshot to %s with %s", peer.addr, snapShotReq.String())

snapshot_resp, err := (*peer.raftServiceCli).Snapshot(context.Background(), snap_shot_req)
snapshotResp, err := (*peer.raftServiceCli).Snapshot(context.Background(), snapShotReq)
if err != nil {
logger.ELogger().Sugar().Errorf("send snapshot to %s failed %v", peer.addr, err.Error())
}

rf.mu.Lock()
logger.ELogger().Sugar().Debugf("send snapshot to %s with resp %s", peer.addr, snapshot_resp.String())
logger.ELogger().Sugar().Debugf("send snapshot to %s with resp %s", peer.addr, snapshotResp.String())

if snapshot_resp != nil && rf.role == NodeRoleLeader &&
rf.curTerm == snap_shot_req.Term && snapshot_resp.Term > rf.curTerm {
if snapshotResp != nil && rf.role == NodeRoleLeader &&
rf.curTerm == snapShotReq.Term && snapshotResp.Term > rf.curTerm {
rf.SwitchRaftNodeRole(NodeRoleFollower)
rf.curTerm = snapshot_resp.Term
rf.curTerm = snapshotResp.Term
rf.votedFor = -1
rf.PersistRaftState()
rf.mu.Unlock()
return
}
logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snap_shot_req.LastIncludedIndex)
rf.matchIdx[peer.id] = int(snap_shot_req.LastIncludedIndex)
rf.nextIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) + 1
logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snapShotReq.LastIncludedIndex)
rf.matchIdx[peer.id] = int(snapShotReq.LastIncludedIndex)
rf.nextIdx[peer.id] = int(snapShotReq.LastIncludedIndex) + 1
rf.mu.Unlock()
return
}

first_index := rf.logs.GetFirst().Index
logger.ELogger().Sugar().Debugf("first log index %d", first_index)
new_ents, _ := rf.logs.EraseBefore(int64(prev_log_index)+1, false)
firstIndex := rf.logs.GetFirst().Index
logger.ELogger().Sugar().Debugf("first log index %d", firstIndex)
new_ents, _ := rf.logs.EraseBefore(int64(prevLogIndex)+1, false)
entries := make([]*pb.Entry, len(new_ents))
copy(entries, new_ents)

append_ent_req := &pb.AppendEntriesRequest{
appendEntReq := &pb.AppendEntriesRequest{
Term: rf.curTerm,
LeaderId: int64(rf.id),
PrevLogIndex: int64(prev_log_index),
PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index)).Term),
PrevLogIndex: int64(prevLogIndex),
PrevLogTerm: int64(rf.logs.GetEntry(int64(prevLogIndex)).Term),
Entries: entries,
LeaderCommit: rf.commitIdx,
}
rf.mu.RUnlock()

// send empty ae to peers
resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), append_ent_req)
resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), appendEntReq)
if err != nil {
logger.ELogger().Sugar().Errorf("send append entries to %s failed %v\n", peer.addr, err.Error())
}
if rf.role == NodeRoleLeader && rf.curTerm == append_ent_req.Term && resp != nil && resp.Success {
if rf.role == NodeRoleLeader && rf.curTerm == appendEntReq.Term && resp != nil && resp.Success {
// deal with appendRnt resp
logger.ELogger().Sugar().Debugf("send heart beat to %s success", peer.addr)
rf.matchIdx[peer.id] = int(append_ent_req.PrevLogIndex) + len(append_ent_req.Entries)
rf.matchIdx[peer.id] = int(appendEntReq.PrevLogIndex) + len(appendEntReq.Entries)
rf.nextIdx[peer.id] = rf.matchIdx[peer.id] + 1
rf.advanceCommitIndexForLeader()
return
@@ -600,7 +605,7 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) {
if resp.Term == rf.curTerm {
rf.nextIdx[peer.id] = int(resp.ConflictIndex)
if resp.ConflictTerm != -1 {
for i := append_ent_req.PrevLogIndex; i >= int64(first_index); i-- {
for i := appendEntReq.PrevLogIndex; i >= int64(firstIndex); i-- {
if rf.logs.GetEntry(i).Term == uint64(resp.ConflictTerm) {
rf.nextIdx[peer.id] = int(i + 1)
break
@@ -620,10 +625,10 @@ func (rf *Raft) Applier() {
rf.applyCond.Wait()
}

commit_index, last_applied := rf.commitIdx, rf.lastApplied
entries := make([]*pb.Entry, commit_index-last_applied)
copy(entries, rf.logs.GetRange(last_applied+1, commit_index))
logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.id, rf.lastApplied, commit_index, rf.curTerm)
commitIndex, lastApplied := rf.commitIdx, rf.lastApplied
entries := make([]*pb.Entry, commitIndex-lastApplied)
copy(entries, rf.logs.GetRange(lastApplied+1, commitIndex))
logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.id, rf.lastApplied, commitIndex, rf.curTerm)

rf.mu.Unlock()
for _, entry := range entries {
@@ -636,25 +641,25 @@ func (rf *Raft) Applier() {
}

rf.mu.Lock()
rf.lastApplied = int64(Max(int(rf.lastApplied), int(commit_index)))
rf.lastApplied = int64(Max(int(rf.lastApplied), int(commitIndex)))
rf.mu.Unlock()
}
}

func (rf *Raft) Snapshot(snap_idx uint64, snapshotContext []byte) error {
func (rf *Raft) Snapshot(snapIdx uint64, snapshotContext []byte) error {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.isSnapshoting = true
if snap_idx <= rf.logs.GetFirstLogId() {
if snapIdx <= rf.logs.GetFirstLogId() {
rf.isSnapshoting = false
return errors.New("ety index is larger than the first log index")
}
_, err := rf.logs.EraseBefore(int64(snap_idx), true)
_, err := rf.logs.EraseBefore(int64(snapIdx), true)
if err != nil {
rf.isSnapshoting = false
return err
}
if err := rf.logs.ResetFirstLogEntry(rf.curTerm, int64(snap_idx)); err != nil {
if err := rf.logs.ResetFirstLogEntry(rf.curTerm, int64(snapIdx)); err != nil {
rf.isSnapshoting = false
return err
}
4 changes: 2 additions & 2 deletions raftcore/raft_peer_node.go
Original file line number Diff line number Diff line change
@@ -52,12 +52,12 @@ func MakeRaftPeerNode(addr string, id uint64) *RaftPeerNode {
}
conns := []*grpc.ClientConn{}
conns = append(conns, conn)
rpc_client := raftpb.NewRaftServiceClient(conn)
rpcClient := raftpb.NewRaftServiceClient(conn)
return &RaftPeerNode{
id: id,
addr: addr,
conns: conns,
raftServiceCli: &rpc_client,
raftServiceCli: &rpcClient,
}
}

94 changes: 47 additions & 47 deletions raftcore/raft_persistent_log.go
Original file line number Diff line number Diff line change
@@ -55,9 +55,9 @@ func MakePersistRaftLog(newdbEng storage_eng.KvStore) *RaftLog {
_, readBootstrapStateErr := newdbEng.GetBytesValue(BootstrapStateKey)
if err != nil && readBootstrapStateErr != nil {
logger.ELogger().Sugar().Debugf("init raft log state")
emp_ent := &pb.Entry{}
emp_ent_encode := EncodeEntry(emp_ent)
if err := newdbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), emp_ent_encode); err != nil {
empEnt := &pb.Entry{}
empEntEncode := EncodeEntry(empEnt)
if err := newdbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empEntEncode); err != nil {
panic(err.Error())
}
if err := newdbEng.PutBytesKv(BootstrapStateKey, []byte{}); err != nil {
@@ -69,13 +69,13 @@ func MakePersistRaftLog(newdbEng storage_eng.KvStore) *RaftLog {
if err != nil {
panic(err)
}
last_idx := binary.BigEndian.Uint64(lidkBytes[len(RAFTLOG_PREFIX):])
lastIdx := binary.BigEndian.Uint64(lidkBytes[len(RAFTLOG_PREFIX):])
fidkBytes, _, err := newdbEng.SeekPrefixFirst(RAFTLOG_PREFIX)
if err != nil {
panic(err)
}
first_idx := binary.BigEndian.Uint64(fidkBytes[len(RAFTLOG_PREFIX):])
return &RaftLog{dbEng: newdbEng, lastIdx: last_idx, firstIdx: first_idx}
firstIdx := binary.BigEndian.Uint64(fidkBytes[len(RAFTLOG_PREFIX):])
return &RaftLog{dbEng: newdbEng, lastIdx: lastIdx, firstIdx: firstIdx}
}

// PersistRaftState Persistent storage raft state
@@ -84,22 +84,22 @@ func MakePersistRaftLog(newdbEng storage_eng.KvStore) *RaftLog {
func (rfLog *RaftLog) PersistRaftState(curTerm int64, votedFor int64) {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
rf_state := &RaftPersistenState{
rfState := &RaftPersistenState{
CurTerm: curTerm,
VotedFor: votedFor,
}
rfLog.dbEng.PutBytesKv(RAFT_STATE_KEY, EncodeRaftState(rf_state))
rfLog.dbEng.PutBytesKv(RAFT_STATE_KEY, EncodeRaftState(rfState))
}

// ReadRaftState
// read the persist curTerm, votedFor for node from storage engine
func (rfLog *RaftLog) ReadRaftState() (curTerm int64, votedFor int64) {
rf_bytes, err := rfLog.dbEng.GetBytesValue(RAFT_STATE_KEY)
rfBytes, err := rfLog.dbEng.GetBytesValue(RAFT_STATE_KEY)
if err != nil {
return 0, -1
}
rf_state := DecodeRaftState(rf_bytes)
return rf_state.CurTerm, rf_state.VotedFor
rfState := DecodeRaftState(rfBytes)
return rfState.CurTerm, rfState.VotedFor
}

// PersistSnapshot ...
@@ -136,28 +136,28 @@ func (rfLog *RaftLog) GetLastLogId() uint64 {
func (rfLog *RaftLog) SetEntFirstData(d []byte) error {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
first_idx := rfLog.GetFirstLogId()
encode_value, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(first_idx)))
firstIdx := rfLog.GetFirstLogId()
encodeValue, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(firstIdx)))
if err != nil {
logger.ELogger().Sugar().Panicf("get log entry with id %d error!", first_idx)
logger.ELogger().Sugar().Panicf("get log entry with id %d error!", firstIdx)
panic(err)
}
ent := DecodeEntry(encode_value)
ent.Index = int64(first_idx)
ent := DecodeEntry(encodeValue)
ent.Index = int64(firstIdx)
ent.Data = d
newent_encode := EncodeEntry(ent)
return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(first_idx), newent_encode)
newentEncode := EncodeEntry(ent)
return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(firstIdx), newentEncode)
}

func (rfLog *RaftLog) ResetFirstLogEntry(term int64, index int64) error {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
new_ent := &pb.Entry{}
new_ent.EntryType = pb.EntryType_EntryNormal
new_ent.Term = uint64(term)
new_ent.Index = index
newent_encode := EncodeEntry(new_ent)
if err := rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), newent_encode); err != nil {
newEnt := &pb.Entry{}
newEnt.EntryType = pb.EntryType_EntryNormal
newEnt.Term = uint64(term)
newEnt.Index = index
newentEncode := EncodeEntry(newEnt)
if err := rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), newentEncode); err != nil {
return err
}
rfLog.firstIdx = uint64(index)
@@ -176,9 +176,9 @@ func (rfLog *RaftLog) ReInitLogs() error {
rfLog.firstIdx = 0
rfLog.lastIdx = 0
// add a empty
emp_ent := &pb.Entry{}
empent_encode := EncodeEntry(emp_ent)
return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empent_encode)
empEnt := &pb.Entry{}
empentEncode := EncodeEntry(empEnt)
return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empentEncode)
}

// GetFirst
@@ -214,8 +214,8 @@ func (rfLog *RaftLog) LogItemCount() int {
func (rfLog *RaftLog) Append(newEnt *pb.Entry) {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
newent_encode := EncodeEntry(newEnt)
rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(rfLog.lastIdx)+1), newent_encode)
newentEncode := EncodeEntry(newEnt)
rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(rfLog.lastIdx)+1), newentEncode)
rfLog.lastIdx += 1
}

@@ -226,18 +226,18 @@ func (rfLog *RaftLog) EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, erro
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
ents := []*pb.Entry{}
lastlog_id := rfLog.GetLastLogId()
firstlog_id := rfLog.GetFirstLogId()
lastlogID := rfLog.GetLastLogId()
firstlogID := rfLog.GetFirstLogId()
if withDel {
for i := firstlog_id; i < uint64(logidx); i++ {
for i := firstlogID; i < uint64(logidx); i++ {
if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(i)); err != nil {
return ents, err
}
logger.ELogger().Sugar().Debugf("del log with id %d success", i)
}
rfLog.firstIdx = uint64(logidx)
}
for i := logidx; i <= int64(lastlog_id); i++ {
for i := logidx; i <= int64(lastlogID); i++ {
ents = append(ents, rfLog.GetEnt(i))
}
return ents, nil
@@ -249,7 +249,7 @@ func (rfLog *RaftLog) EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, erro
func (rfLog *RaftLog) EraseAfter(logidx int64, withDel bool) []*pb.Entry {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
firstlog_id := rfLog.GetFirstLogId()
firstlogID := rfLog.GetFirstLogId()
if withDel {
for i := logidx; i <= int64(rfLog.GetLastLogId()); i++ {
if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(uint64(i))); err != nil {
@@ -259,7 +259,7 @@ func (rfLog *RaftLog) EraseAfter(logidx int64, withDel bool) []*pb.Entry {
rfLog.lastIdx = uint64(logidx) - 1
}
ents := []*pb.Entry{}
for i := firstlog_id; i < uint64(logidx); i++ {
for i := firstlogID; i < uint64(logidx); i++ {
ents = append(ents, rfLog.GetEnt(int64(i)))
}
return ents
@@ -287,23 +287,23 @@ func (rfLog *RaftLog) GetEntry(idx int64) *pb.Entry {
}

func (rfLog *RaftLog) GetEnt(logidx int64) *pb.Entry {
encode_value, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(logidx)))
encodeValue, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(logidx)))
if err != nil {
logger.ELogger().Sugar().Debugf("get log entry with id %d error!", logidx)
panic(err)
}
return DecodeEntry(encode_value)
return DecodeEntry(encodeValue)
}

// EncodeRaftLogKey
// encode raft log key with perfix -> RAFTLOG_PREFIX
func EncodeRaftLogKey(idx uint64) []byte {
var out_buf bytes.Buffer
out_buf.Write(RAFTLOG_PREFIX)
var outBuf bytes.Buffer
outBuf.Write(RAFTLOG_PREFIX)
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(idx))
out_buf.Write(b)
return out_buf.Bytes()
outBuf.Write(b)
return outBuf.Bytes()
}

// DecodeRaftLogKey
@@ -333,17 +333,17 @@ func DecodeEntry(in []byte) *pb.Entry {
// EncodeRaftState
// encode RaftPersistenState to bytes sequence
func EncodeRaftState(rfState *RaftPersistenState) []byte {
var bytes_state bytes.Buffer
enc := gob.NewEncoder(&bytes_state)
var bytesState bytes.Buffer
enc := gob.NewEncoder(&bytesState)
enc.Encode(rfState)
return bytes_state.Bytes()
return bytesState.Bytes()
}

// DecodeRaftState
// decode RaftPersistenState from bytes sequence
func DecodeRaftState(in []byte) *RaftPersistenState {
dec := gob.NewDecoder(bytes.NewBuffer(in))
rf_state := RaftPersistenState{}
dec.Decode(&rf_state)
return &rf_state
rfState := RaftPersistenState{}
dec.Decode(&rfState)
return &rfState
}
186 changes: 93 additions & 93 deletions shardkvserver/shard_kvserver.go
Original file line number Diff line number Diff line change
@@ -79,44 +79,44 @@ type MemSnapshotDB struct {
// gid: the node's raft group id
// configServerAddr: config server addr (leader addr, need to optimized into config server peer map)
func MakeShardKVServer(peerMaps map[int]string, nodeId int64, gid int, configServerAddrs string) *ShardKV {
client_ends := []*raftcore.RaftPeerNode{}
clientEnds := []*raftcore.RaftPeerNode{}
for id, addr := range peerMaps {
new_end := raftcore.MakeRaftPeerNode(addr, uint64(id))
client_ends = append(client_ends, new_end)
newEnd := raftcore.MakeRaftPeerNode(addr, uint64(id))
clientEnds = append(clientEnds, newEnd)
}
new_apply_ch := make(chan *pb.ApplyMsg)
newApplyCh := make(chan *pb.ApplyMsg)

log_db_eng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))
new_rf := raftcore.MakeRaft(client_ends, nodeId, log_db_eng, new_apply_ch, 50, 150)
newdb_eng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))
logDbEng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))
newRf := raftcore.MakeRaft(clientEnds, nodeId, logDbEng, newApplyCh, 50, 150)
newdbEng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))

shard_kv := &ShardKV{
shardKv := &ShardKV{
dead: 0,
rf: new_rf,
applyCh: new_apply_ch,
rf: newRf,
applyCh: newApplyCh,
gid_: gid,
cvCli: metaserver.MakeMetaSvrClient(common.UN_UNSED_TID, strings.Split(configServerAddrs, ",")),
lastApplied: 0,
curConfig: metaserver.DefaultConfig(),
lastConfig: metaserver.DefaultConfig(),
stm: make(map[int]*Bucket),
dbEng: newdb_eng,
dbEng: newdbEng,
notifyChans: map[int]chan *pb.CommandResponse{},
}

shard_kv.initStm(shard_kv.dbEng)
shardKv.initStm(shardKv.dbEng)

shard_kv.curConfig = *shard_kv.cvCli.Query(-1)
shard_kv.lastConfig = *shard_kv.cvCli.Query(-1)
shardKv.curConfig = *shardKv.cvCli.Query(-1)
shardKv.lastConfig = *shardKv.cvCli.Query(-1)

shard_kv.stopApplyCh = make(chan interface{})
shardKv.stopApplyCh = make(chan interface{})

// start applier
go shard_kv.ApplingToStm(shard_kv.stopApplyCh)
go shardKv.ApplingToStm(shardKv.stopApplyCh)

go shard_kv.ConfigAction()
go shardKv.ConfigAction()

return shard_kv
return shardKv
}

// CloseApply close the stopApplyCh to stop commit entries apply
@@ -135,36 +135,36 @@ func (s *ShardKV) ConfigAction() {
logger.ELogger().Sugar().Debugf("timeout into config action")

s.mu.RLock()
can_perform_next_conf := true
canPerformNextConf := true
for _, bucket := range s.stm {
if bucket.Status != Runing {
can_perform_next_conf = false
canPerformNextConf = false
logger.ELogger().Sugar().Errorf("cano't perform next conf")
break
}
}
if can_perform_next_conf {
if canPerformNextConf {
logger.ELogger().Sugar().Debug("can perform next conf")
}
cur_conf_version := s.curConfig.Version
curConfVersion := s.curConfig.Version
s.mu.RUnlock()
if can_perform_next_conf {
next_config := s.cvCli.Query(int64(cur_conf_version) + 1)
if next_config == nil {
if canPerformNextConf {
nextConfig := s.cvCli.Query(int64(curConfVersion) + 1)
if nextConfig == nil {
continue
}
next_cf_bytes, _ := json.Marshal(next_config)
cur_cf_bytes, _ := json.Marshal(s.curConfig)
logger.ELogger().Sugar().Debugf("next config %s ", string(next_cf_bytes))
logger.ELogger().Sugar().Debugf("cur config %s ", string(cur_cf_bytes))
if next_config.Version == cur_conf_version+1 {
nextCfBytes, _ := json.Marshal(nextConfig)
curCfBytes, _ := json.Marshal(s.curConfig)
logger.ELogger().Sugar().Debugf("next config %s ", string(nextCfBytes))
logger.ELogger().Sugar().Debugf("cur config %s ", string(curCfBytes))
if nextConfig.Version == curConfVersion+1 {
req := &pb.CommandRequest{}
next_cf_bytes, _ := json.Marshal(next_config)
logger.ELogger().Sugar().Debugf("can perform next conf %s ", string(next_cf_bytes))
req.Context = next_cf_bytes
nextCfBytes, _ := json.Marshal(nextConfig)
logger.ELogger().Sugar().Debugf("can perform next conf %s ", string(nextCfBytes))
req.Context = nextCfBytes
req.OpType = pb.OpType_OpConfigChange
req_bytes, _ := json.Marshal(req)
idx, _, isLeader := s.rf.Propose(req_bytes)
reqBytes, _ := json.Marshal(req)
idx, _, isLeader := s.rf.Propose(reqBytes)
if !isLeader {
return
}
@@ -212,11 +212,11 @@ func (s *ShardKV) IsKilled() bool {
// DoCommand do client put get command
func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.CommandResponse, error) {

cmd_resp := &pb.CommandResponse{}
cmdResp := &pb.CommandResponse{}

if !s.CanServe(common.Key2BucketID(req.Key)) {
cmd_resp.ErrCode = common.ErrCodeWrongGroup
return cmd_resp, nil
cmdResp.ErrCode = common.ErrCodeWrongGroup
return cmdResp, nil
}
req_bytes, err := json.Marshal(req)
if err != nil {
@@ -225,9 +225,9 @@ func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.Co
// propose to raft
idx, _, isLeader := s.rf.Propose(req_bytes)
if !isLeader {
cmd_resp.ErrCode = common.ErrCodeWrongLeader
cmd_resp.LeaderId = s.GetRf().GetLeaderId()
return cmd_resp, nil
cmdResp.ErrCode = common.ErrCodeWrongLeader
cmdResp.LeaderId = s.GetRf().GetLeaderId()
return cmdResp, nil
}

s.mu.Lock()
@@ -237,11 +237,11 @@ func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.Co
select {
case res := <-ch:
if res != nil {
cmd_resp.ErrCode = common.ErrCodeNoErr
cmd_resp.Value = res.Value
cmdResp.ErrCode = common.ErrCodeNoErr
cmdResp.Value = res.Value
}
case <-time.After(metaserver.ExecTimeout):
return cmd_resp, errors.New("ExecTimeout")
return cmdResp, errors.New("ExecTimeout")
}

go func() {
@@ -250,7 +250,7 @@ func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.Co
s.mu.Unlock()
}()

return cmd_resp, nil
return cmdResp, nil
}

// ApplingToStm apply the commit operation to state machine
@@ -280,49 +280,49 @@ func (s *ShardKV) ApplingToStm(done <-chan interface{}) {
s.lastApplied = int(appliedMsg.CommandIndex)
logger.ELogger().Sugar().Debugf("shard_kvserver last applied %d", s.lastApplied)

cmd_resp := &pb.CommandResponse{}
cmdResp := &pb.CommandResponse{}
value := ""
var err error
switch req.OpType {
// Normal Op
case pb.OpType_OpPut:
bucket_id := common.Key2BucketID(req.Key)
if s.CanServe(bucket_id) {
logger.ELogger().Sugar().Debug("WRITE put " + req.Key + " value " + req.Value + " to bucket " + strconv.Itoa(bucket_id))
s.stm[bucket_id].Put(req.Key, req.Value)
bucketID := common.Key2BucketID(req.Key)
if s.CanServe(bucketID) {
logger.ELogger().Sugar().Debug("WRITE put " + req.Key + " value " + req.Value + " to bucket " + strconv.Itoa(bucketID))
s.stm[bucketID].Put(req.Key, req.Value)
}
case pb.OpType_OpAppend:
bucket_id := common.Key2BucketID(req.Key)
if s.CanServe(bucket_id) {
s.stm[bucket_id].Append(req.Key, req.Value)
bucketID := common.Key2BucketID(req.Key)
if s.CanServe(bucketID) {
s.stm[bucketID].Append(req.Key, req.Value)
}
case pb.OpType_OpGet:
bucket_id := common.Key2BucketID(req.Key)
if s.CanServe(bucket_id) {
value, err = s.stm[bucket_id].Get(req.Key)
logger.ELogger().Sugar().Debug("get " + req.Key + " value " + value + " from bucket " + strconv.Itoa(bucket_id))
bucketID := common.Key2BucketID(req.Key)
if s.CanServe(bucketID) {
value, err = s.stm[bucketID].Get(req.Key)
logger.ELogger().Sugar().Debug("get " + req.Key + " value " + value + " from bucket " + strconv.Itoa(bucketID))
}
cmd_resp.Value = value
cmdResp.Value = value
case pb.OpType_OpConfigChange:
next_config := &metaserver.Config{}
json.Unmarshal(req.Context, next_config)
if next_config.Version == s.curConfig.Version+1 {
nextConfig := &metaserver.Config{}
json.Unmarshal(req.Context, nextConfig)
if nextConfig.Version == s.curConfig.Version+1 {
for i := 0; i < common.NBuckets; i++ {
if s.curConfig.Buckets[i] != s.gid_ && next_config.Buckets[i] == s.gid_ {
if s.curConfig.Buckets[i] != s.gid_ && nextConfig.Buckets[i] == s.gid_ {
gid := s.curConfig.Buckets[i]
if gid != 0 {
s.stm[i].Status = Runing
}
}
if s.curConfig.Buckets[i] == s.gid_ && next_config.Buckets[i] != s.gid_ {
gid := next_config.Buckets[i]
if s.curConfig.Buckets[i] == s.gid_ && nextConfig.Buckets[i] != s.gid_ {
gid := nextConfig.Buckets[i]
if gid != 0 {
s.stm[i].Status = Stopped
}
}
}
s.lastConfig = s.curConfig
s.curConfig = *next_config
s.curConfig = *nextConfig
cf_bytes, _ := json.Marshal(s.curConfig)
logger.ELogger().Sugar().Debugf("applied config to server %s ", string(cf_bytes))
}
@@ -351,7 +351,7 @@ func (s *ShardKV) ApplingToStm(done <-chan interface{}) {
}

ch := s.getNotifyChan(int(appliedMsg.CommandIndex))
ch <- cmd_resp
ch <- cmdResp

if _, isLeader := s.rf.GetState(); isLeader && s.GetRf().LogCount() > 500 {
s.mu.Lock()
@@ -444,71 +444,71 @@ func (s *ShardKV) Snapshot(ctx context.Context, req *pb.InstallSnapshotRequest)
// rpc interface
// DoBucketsOperation hanlde bucket data get, delete and insert
func (s *ShardKV) DoBucketsOperation(ctx context.Context, req *pb.BucketOperationRequest) (*pb.BucketOperationResponse, error) {
op_resp := &pb.BucketOperationResponse{}
opResp := &pb.BucketOperationResponse{}
if _, isLeader := s.rf.GetState(); !isLeader {
return op_resp, errors.New("ErrorWrongLeader")
return opResp, errors.New("ErrorWrongLeader")
}
switch req.BucketOpType {
case pb.BucketOpType_OpGetData:
{
s.mu.RLock()
if s.curConfig.Version < int(req.ConfigVersion) {
s.mu.RUnlock()
return op_resp, errors.New("ErrNotReady")
return opResp, errors.New("ErrNotReady")
}
bucket_datas := &BucketDatasVo{}
bucket_datas.Datas = map[int]map[string]string{}
bucketDatas := &BucketDatasVo{}
bucketDatas.Datas = map[int]map[string]string{}
for _, bucketID := range req.BucketIds {
sDatas, err := s.stm[int(bucketID)].deepCopy(false)
if err != nil {
s.mu.RUnlock()
return op_resp, err
return opResp, err
}
bucket_datas.Datas[int(bucketID)] = sDatas
bucketDatas.Datas[int(bucketID)] = sDatas
}
buket_data_bytes, _ := json.Marshal(bucket_datas)
op_resp.BucketsDatas = buket_data_bytes
op_resp.ConfigVersion = req.ConfigVersion
buketDataBytes, _ := json.Marshal(bucketDatas)
opResp.BucketsDatas = buketDataBytes
opResp.ConfigVersion = req.ConfigVersion
s.mu.RUnlock()
}
case pb.BucketOpType_OpDeleteData:
{
s.mu.RLock()
if int64(s.curConfig.Version) > req.ConfigVersion {
s.mu.RUnlock()
return op_resp, nil
return opResp, nil
}
s.mu.RUnlock()
command_req := &pb.CommandRequest{}
bucket_op_req_bytes, _ := json.Marshal(req)
command_req.Context = bucket_op_req_bytes
command_req.OpType = pb.OpType_OpDeleteBuckets
command_req_bytes, _ := json.Marshal(command_req)
commandReq := &pb.CommandRequest{}
bucketOpReqBytes, _ := json.Marshal(req)
commandReq.Context = bucketOpReqBytes
commandReq.OpType = pb.OpType_OpDeleteBuckets
commandReqBytes, _ := json.Marshal(commandReq)
// async
_, _, isLeader := s.rf.Propose(command_req_bytes)
_, _, isLeader := s.rf.Propose(commandReqBytes)
if !isLeader {
return op_resp, nil
return opResp, nil
}
}
case pb.BucketOpType_OpInsertData:
{
s.mu.RLock()
if int64(s.curConfig.Version) > req.ConfigVersion {
s.mu.RUnlock()
return op_resp, nil
return opResp, nil
}
s.mu.RUnlock()
command_req := &pb.CommandRequest{}
bucket_op_req_bytes, _ := json.Marshal(req)
command_req.Context = bucket_op_req_bytes
command_req.OpType = pb.OpType_OpInsertBuckets
command_req_bytes, _ := json.Marshal(command_req)
commandReq := &pb.CommandRequest{}
bucketOpReqBytes, _ := json.Marshal(req)
commandReq.Context = bucketOpReqBytes
commandReq.OpType = pb.OpType_OpInsertBuckets
commandReqBytes, _ := json.Marshal(commandReq)
// async
_, _, isLeader := s.rf.Propose(command_req_bytes)
_, _, isLeader := s.rf.Propose(commandReqBytes)
if !isLeader {
return op_resp, nil
return opResp, nil
}
}
}
return op_resp, nil
return opResp, nil
}

0 comments on commit f75dd43

Please sign in to comment.