diff --git a/Makefile b/Makefile index e5811074..912d21a9 100644 --- a/Makefile +++ b/Makefile @@ -29,11 +29,11 @@ default: meta_cli shard_server shard_cli meta_server image: docker build -f Dockerfile --network=host -t $(BUILDER_IMAGE) . -build_dev: +build-dev: chmod +x scripts/build_dev.sh docker run -it --rm -v $(realpath .):/eraft eraft/eraftbook:$(IMAGE_VERSION) /eraft/scripts/build_dev.sh -run_test: +run-test: chmod +x scripts/run_tests.sh docker run --name test-cli-node --network mytestnetwork --ip 172.18.0.5 -it --rm -v $(realpath .):/eraft eraft/eraftbook:$(IMAGE_VERSION) /eraft/scripts/run_tests.sh diff --git a/README.md b/README.md index b168f8c8..d0cdae77 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,33 @@ eraft 中使用了 hash 分片的方法,我们将数据通过哈希算法映 它主要负责集群数据存储,一般有三台机器组成一个 raft 组,对外提供高可用的服务。 +### 在容器里面运行 + +构建镜像 +``` +make image +``` + +编译代码 +``` +make build-dev +``` + +运行 demo 集群 +``` +make run-demo +``` + +运行读写测试 +``` +make run-test +``` + +停止集群 +``` +make stop-demo +``` + ### 项目构建 构建依赖 diff --git a/cmd/metacli/metacli.go b/cmd/metacli/metacli.go index eded7178..0d3c8e63 100644 --- a/cmd/metacli/metacli.go +++ b/cmd/metacli/metacli.go @@ -49,15 +49,15 @@ func main() { sigs := make(chan os.Signal, 1) addrs := strings.Split(os.Args[1], ",") - meta_cli := metaserver.MakeMetaSvrClient(common.UN_UNSED_TID, addrs) + metaCli := metaserver.MakeMetaSvrClient(common.UN_UNSED_TID, addrs) - sig_chan := make(chan os.Signal, 1) - signal.Notify(sig_chan) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan) go func() { sig := <-sigs fmt.Println(sig) - for _, cli := range meta_cli.GetRpcClis() { + for _, cli := range metaCli.GetRpcClis() { cli.CloseAllConn() } os.Exit(-1) @@ -69,18 +69,18 @@ func main() { gid, _ := strconv.Atoi(os.Args[3]) addr_map := make(map[int64]string) addr_map[int64(gid)] = os.Args[4] - meta_cli.Join(addr_map) + metaCli.Join(addr_map) } case "leave": { gid, _ := strconv.Atoi(os.Args[3]) - meta_cli.Leave([]int64{int64(gid)}) + metaCli.Leave([]int64{int64(gid)}) } case "query": { - last_conf := meta_cli.Query(-1) - out_bytes, _ := json.Marshal(last_conf) - raftcore.PrintDebugLog("latest configuration: " + string(out_bytes)) + lastConf := metaCli.Query(-1) + outBytes, _ := json.Marshal(lastConf) + raftcore.PrintDebugLog("latest configuration: " + string(outBytes)) } case "move": { @@ -94,7 +94,7 @@ func main() { gid, _ := strconv.Atoi(os.Args[4]) for i := start; i <= end; i++ { - meta_cli.Move(i, gid) + metaCli.Move(i, gid) } } } diff --git a/cmd/metasvr/metasvr.go b/cmd/metasvr/metasvr.go index ca0ab05f..ad7ccc0c 100644 --- a/cmd/metasvr/metasvr.go +++ b/cmd/metasvr/metasvr.go @@ -50,26 +50,26 @@ func main() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - node_id_str := os.Args[1] - node_id, err := strconv.Atoi(node_id_str) + nodeIdStr := os.Args[1] + nodeID, err := strconv.Atoi(nodeIdStr) if err != nil { panic(err) } - meta_svr_addrs := strings.Split(os.Args[2], ",") - cf_peer_map := make(map[int]string) - for i, addr := range meta_svr_addrs { - cf_peer_map[i] = addr + metaSvrAddrs := strings.Split(os.Args[2], ",") + cfPeerMap := make(map[int]string) + for i, addr := range metaSvrAddrs { + cfPeerMap[i] = addr } - meta_svr := metaserver.MakeMetaServer(cf_peer_map, node_id) - lis, err := net.Listen("tcp", cf_peer_map[node_id]) + metaSvr := metaserver.MakeMetaServer(cfPeerMap, nodeID) + lis, err := net.Listen("tcp", cfPeerMap[nodeID]) if err != nil { fmt.Printf("failed to listen: %v", err) return } s := grpc.NewServer() - pb.RegisterRaftServiceServer(s, meta_svr) + pb.RegisterRaftServiceServer(s, metaSvr) sigChan := make(chan os.Signal, 1) @@ -78,8 +78,8 @@ func main() { go func() { sig := <-sigs fmt.Println(sig) - meta_svr.Rf.CloseEndsConn() - meta_svr.StopApply() + metaSvr.Rf.CloseEndsConn() + metaSvr.StopApply() os.Exit(-1) }() diff --git a/cmd/shardcli/shardcli.go b/cmd/shardcli/shardcli.go index 84c0e19f..01b9c416 100644 --- a/cmd/shardcli/shardcli.go +++ b/cmd/shardcli/shardcli.go @@ -47,19 +47,19 @@ func main() { } sigs := make(chan os.Signal, 1) - shard_kvcli := shardkvserver.MakeKvClient(os.Args[1]) + shardKvCli := shardkvserver.MakeKvClient(os.Args[1]) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan) switch os.Args[2] { case "put": - if err := shard_kvcli.Put(os.Args[3], os.Args[4]); err != nil { + if err := shardKvCli.Put(os.Args[3], os.Args[4]); err != nil { fmt.Println("err: " + err.Error()) return } case "get": - v, err := shard_kvcli.Get(os.Args[3]) + v, err := shardKvCli.Get(os.Args[3]) if err != nil { fmt.Println("err: " + err.Error()) return @@ -69,40 +69,40 @@ func main() { gid, _ := strconv.Atoi(os.Args[3]) bidsStr := os.Args[4] bids := []int64{} - bids_strarr := strings.Split(bidsStr, ",") - for _, bidStr := range bids_strarr { + bidsStrArr := strings.Split(bidsStr, ",") + for _, bidStr := range bidsStrArr { bid, _ := strconv.Atoi(bidStr) bids = append(bids, int64(bid)) } - datas := shard_kvcli.GetBucketDatas(gid, bids) + datas := shardKvCli.GetBucketDatas(gid, bids) fmt.Println("get buckets datas: " + datas) case "delbuckets": gid, _ := strconv.Atoi(os.Args[3]) bidsStr := os.Args[4] bids := []int64{} - bids_strarr := strings.Split(bidsStr, ",") - for _, bidStr := range bids_strarr { + bidsStrArr := strings.Split(bidsStr, ",") + for _, bidStr := range bidsStrArr { bid, _ := strconv.Atoi(bidStr) bids = append(bids, int64(bid)) } - shard_kvcli.DeleteBucketDatas(gid, bids) + shardKvCli.DeleteBucketDatas(gid, bids) case "insertbucketkv": gid, _ := strconv.Atoi(os.Args[3]) bid, _ := strconv.Atoi(os.Args[4]) - bucket_datas := &shardkvserver.BucketDatasVo{} - bucket_datas.Datas = make(map[int]map[string]string) + bucketDatas := &shardkvserver.BucketDatasVo{} + bucketDatas.Datas = make(map[int]map[string]string) kv := map[string]string{os.Args[5]: os.Args[6]} - bucket_datas.Datas[bid] = kv - datas, _ := json.Marshal(bucket_datas) - shard_kvcli.InsertBucketDatas(gid, []int64{int64(bid)}, datas) + bucketDatas.Datas[bid] = kv + datas, _ := json.Marshal(bucketDatas) + shardKvCli.InsertBucketDatas(gid, []int64{int64(bid)}, datas) } go func() { sig := <-sigs fmt.Println(sig) - for _, cli := range shard_kvcli.GetCsClient().GetRpcClis() { + for _, cli := range shardKvCli.GetCsClient().GetRpcClis() { cli.CloseAllConn() } - shard_kvcli.GetRpcClient().CloseAllConn() + shardKvCli.GetRpcClient().CloseAllConn() os.Exit(-1) }() } diff --git a/cmd/shardsvr/shardsvr.go b/cmd/shardsvr/shardsvr.go index 87e9b231..ad5171d1 100644 --- a/cmd/shardsvr/shardsvr.go +++ b/cmd/shardsvr/shardsvr.go @@ -37,7 +37,6 @@ import ( "github.com/eraft-io/eraft/shardkvserver" "google.golang.org/grpc" - "google.golang.org/grpc/reflection" ) func main() { @@ -49,33 +48,33 @@ func main() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - node_id_str := os.Args[1] - node_id, err := strconv.Atoi(node_id_str) + nodeIdStr := os.Args[1] + nodeID, err := strconv.Atoi(nodeIdStr) if err != nil { panic(err) } - gid_str := os.Args[2] - gid, err := strconv.Atoi(gid_str) + gidStr := os.Args[2] + gid, err := strconv.Atoi(gidStr) if err != nil { panic(err) } - svr_addrs := strings.Split(os.Args[4], ",") - svr_peer_map := make(map[int]string) - for i, addr := range svr_addrs { - svr_peer_map[i] = addr + svrAddrs := strings.Split(os.Args[4], ",") + svrPeerMap := make(map[int]string) + for i, addr := range svrAddrs { + svrPeerMap[i] = addr } - shard_svr := shardkvserver.MakeShardKVServer(svr_peer_map, int64(node_id), gid, os.Args[3]) - lis, err := net.Listen("tcp", svr_peer_map[node_id]) + shardSvr := shardkvserver.MakeShardKVServer(svrPeerMap, int64(nodeID), gid, os.Args[3]) + lis, err := net.Listen("tcp", svrPeerMap[nodeID]) if err != nil { fmt.Printf("failed to listen: %v", err) return } - fmt.Printf("server listen on: %s \n", svr_peer_map[node_id]) + fmt.Printf("server listen on: %s \n", svrPeerMap[nodeID]) s := grpc.NewServer() - pb.RegisterRaftServiceServer(s, shard_svr) + pb.RegisterRaftServiceServer(s, shardSvr) sigChan := make(chan os.Signal, 1) @@ -84,12 +83,11 @@ func main() { go func() { sig := <-sigs fmt.Println(sig) - shard_svr.GetRf().CloseEndsConn() - shard_svr.CloseApply() + shardSvr.GetRf().CloseEndsConn() + shardSvr.CloseApply() os.Exit(-1) }() - reflection.Register(s) err = s.Serve(lis) if err != nil { fmt.Printf("failed to serve: %v", err) diff --git a/common/common.go b/common/common.go index 55825bc0..7596432a 100644 --- a/common/common.go +++ b/common/common.go @@ -47,11 +47,11 @@ func Key2BucketID(key string) int { } func CRC32KeyHash(k string, base int) int { - bucket_id := 0 + bucketID := 0 crc32q := crc32.MakeTable(0xD5828281) sum := crc32.Checksum([]byte(k), crc32q) - bucket_id = int(sum) % NBuckets - return bucket_id + bucketID = int(sum) % NBuckets + return bucketID } func Int64ArrToIntArr(in []int64) []int { diff --git a/metaserver/client.go b/metaserver/client.go index 62a272c0..641d1a3a 100644 --- a/metaserver/client.go +++ b/metaserver/client.go @@ -40,9 +40,9 @@ import ( type MetaSvrCli struct { endpoints []*raftcore.RaftPeerNode - leaderId int64 - clientId int64 - commandId int64 + leaderID int64 + clientID int64 + commandID int64 } func nrand() int64 { @@ -51,32 +51,31 @@ func nrand() int64 { return bigx.Int64() } -func MakeMetaSvrClient(targetId uint64, targetAddrs []string) *MetaSvrCli { - - mate_svr_cli := &MetaSvrCli{ - leaderId: 0, - clientId: nrand(), - commandId: 0, +func MakeMetaSvrClient(targetID uint64, targetAddrs []string) *MetaSvrCli { + mateSvrCli := &MetaSvrCli{ + leaderID: 0, + clientID: nrand(), + commandID: 0, } for _, addr := range targetAddrs { - cli := raftcore.MakeRaftPeerNode(addr, targetId) - mate_svr_cli.endpoints = append(mate_svr_cli.endpoints, cli) + cli := raftcore.MakeRaftPeerNode(addr, targetID) + mateSvrCli.endpoints = append(mateSvrCli.endpoints, cli) } - return mate_svr_cli + return mateSvrCli } -func (meta_svr_cli *MetaSvrCli) GetRpcClis() []*raftcore.RaftPeerNode { - return meta_svr_cli.endpoints +func (metaSvrCli *MetaSvrCli) GetRpcClis() []*raftcore.RaftPeerNode { + return metaSvrCli.endpoints } -func (meta_svr_cli *MetaSvrCli) Query(ver int64) *Config { - conf_req := &pb.ConfigRequest{ +func (metaSvrCli *MetaSvrCli) Query(ver int64) *Config { + confReq := &pb.ConfigRequest{ OpType: pb.ConfigOpType_OpQuery, ConfigVersion: ver, } - resp := meta_svr_cli.CallDoConfigRpc(conf_req) + resp := metaSvrCli.CallDoConfigRpc(confReq) cf := &Config{} if resp != nil { cf.Version = int(resp.Config.ConfigVersion) @@ -92,60 +91,60 @@ func (meta_svr_cli *MetaSvrCli) Query(ver int64) *Config { return cf } -func (meta_svr_cli *MetaSvrCli) Move(bucket_id, gid int) *pb.ConfigResponse { - conf_req := &pb.ConfigRequest{ +func (metaSvrCli *MetaSvrCli) Move(bucketID, gid int) *pb.ConfigResponse { + confReq := &pb.ConfigRequest{ OpType: pb.ConfigOpType_OpMove, - BucketId: int64(bucket_id), + BucketId: int64(bucketID), Gid: int64(gid), } - return meta_svr_cli.CallDoConfigRpc(conf_req) + return metaSvrCli.CallDoConfigRpc(confReq) } -func (meta_svr_cli *MetaSvrCli) Join(servers map[int64]string) *pb.ConfigResponse { - conf_req := &pb.ConfigRequest{ +func (metaSvrCli *MetaSvrCli) Join(servers map[int64]string) *pb.ConfigResponse { + confReq := &pb.ConfigRequest{ OpType: pb.ConfigOpType_OpJoin, Servers: servers, } - return meta_svr_cli.CallDoConfigRpc(conf_req) + return metaSvrCli.CallDoConfigRpc(confReq) } -func (meta_svr_cli *MetaSvrCli) Leave(gids []int64) *pb.ConfigResponse { - conf_req := &pb.ConfigRequest{ +func (metaSvrCli *MetaSvrCli) Leave(gids []int64) *pb.ConfigResponse { + confReq := &pb.ConfigRequest{ OpType: pb.ConfigOpType_OpLeave, Gids: gids, } - return meta_svr_cli.CallDoConfigRpc(conf_req) + return metaSvrCli.CallDoConfigRpc(confReq) } -func (meta_svr_cli *MetaSvrCli) CallDoConfigRpc(req *pb.ConfigRequest) *pb.ConfigResponse { +func (metaSvrCli *MetaSvrCli) CallDoConfigRpc(req *pb.ConfigRequest) *pb.ConfigResponse { var err error - conf_resp := &pb.ConfigResponse{} - conf_resp.Config = &pb.ServerConfig{} - for _, end := range meta_svr_cli.endpoints { - conf_resp, err = (*end.GetRaftServiceCli()).DoConfig(context.Background(), req) + confResp := &pb.ConfigResponse{} + confResp.Config = &pb.ServerConfig{} + for _, end := range metaSvrCli.endpoints { + confResp, err = (*end.GetRaftServiceCli()).DoConfig(context.Background(), req) if err != nil { continue } - switch conf_resp.ErrCode { + switch confResp.ErrCode { case common.ErrCodeNoErr: - meta_svr_cli.commandId++ - return conf_resp + metaSvrCli.commandID++ + return confResp case common.ErrCodeWrongLeader: - conf_resp, err := (*meta_svr_cli.endpoints[conf_resp.LeaderId].GetRaftServiceCli()).DoConfig(context.Background(), req) + confResp, err := (*metaSvrCli.endpoints[confResp.LeaderId].GetRaftServiceCli()).DoConfig(context.Background(), req) if err != nil { logger.ELogger().Sugar().Debugf("a node in cluster is down : ", err.Error()) continue } - if conf_resp.ErrCode == common.ErrCodeNoErr { - meta_svr_cli.commandId++ - return conf_resp + if confResp.ErrCode == common.ErrCodeNoErr { + metaSvrCli.commandID++ + return confResp } - if conf_resp.ErrCode == common.ErrCodeExecTimeout { + if confResp.ErrCode == common.ErrCodeExecTimeout { logger.ELogger().Sugar().Debug("exec timeout") - return conf_resp + return confResp } - return conf_resp + return confResp } } - return conf_resp + return confResp } diff --git a/metaserver/meta.go b/metaserver/meta.go index f9f7fdf9..938c8466 100644 --- a/metaserver/meta.go +++ b/metaserver/meta.go @@ -54,11 +54,11 @@ func (cf *Config) GetGroup2Buckets() map[int][]int { const ExecTimeout = 3 * time.Second func deepCopy(groups map[int][]string) map[int][]string { - new_group := make(map[int][]string) - for gid, severs := range groups { + newGroup := make(map[int][]string) + for gID, severs := range groups { newSvrs := make([]string, len(severs)) copy(newSvrs, severs) - new_group[gid] = newSvrs + newGroup[gID] = newSvrs } - return new_group + return newGroup } diff --git a/metaserver/metaserver.go b/metaserver/metaserver.go index 50c20824..e994b5a2 100644 --- a/metaserver/metaserver.go +++ b/metaserver/metaserver.go @@ -55,29 +55,29 @@ type MetaServer struct { } func MakeMetaServer(peerMaps map[int]string, nodeId int) *MetaServer { - client_ends := []*raftcore.RaftPeerNode{} + clientEnds := []*raftcore.RaftPeerNode{} for id, addr := range peerMaps { - new_end := raftcore.MakeRaftPeerNode(addr, uint64(id)) - client_ends = append(client_ends, new_end) + newEnd := raftcore.MakeRaftPeerNode(addr, uint64(id)) + clientEnds = append(clientEnds, newEnd) } newApplyCh := make(chan *pb.ApplyMsg) - newdb_eng := storage.EngineFactory("leveldb", "./data/db/metanode_"+strconv.Itoa(nodeId)) - logdb_eng := storage.EngineFactory("leveldb", "./data/log/metanode_"+strconv.Itoa(nodeId)) + newdbEng := storage.EngineFactory("leveldb", "./data/db/metanode_"+strconv.Itoa(nodeId)) + logdbEng := storage.EngineFactory("leveldb", "./data/log/metanode_"+strconv.Itoa(nodeId)) - newRf := raftcore.MakeRaft(client_ends, int64(nodeId), logdb_eng, newApplyCh, 50, 150) - meta_server := &MetaServer{ + newRf := raftcore.MakeRaft(clientEnds, int64(nodeId), logdbEng, newApplyCh, 50, 150) + metaServer := &MetaServer{ Rf: newRf, applyCh: newApplyCh, dead: 0, - stm: NewMemConfigStm(newdb_eng), + stm: NewMemConfigStm(newdbEng), notifyChans: make(map[int]chan *pb.ConfigResponse), } - meta_server.stopApplyCh = make(chan interface{}) + metaServer.stopApplyCh = make(chan interface{}) - go meta_server.ApplingToStm(meta_server.stopApplyCh) - return meta_server + go metaServer.ApplingToStm(metaServer.stopApplyCh) + return metaServer } func (s *MetaServer) StopApply() { @@ -94,20 +94,20 @@ func (s *MetaServer) getNotifyChan(index int) chan *pb.ConfigResponse { func (s *MetaServer) DoConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.ConfigResponse, error) { logger.ELogger().Sugar().Debugf("DoConfig %s", req.String()) - cmd_resp := &pb.ConfigResponse{} + cmdResp := &pb.ConfigResponse{} req_bytes, err := json.Marshal(req) if err != nil { - cmd_resp.ErrMsg = err.Error() - return cmd_resp, err + cmdResp.ErrMsg = err.Error() + return cmdResp, err } index, _, isLeader := s.Rf.Propose(req_bytes) if !isLeader { - cmd_resp.ErrMsg = "is not leader" - cmd_resp.ErrCode = common.ErrCodeWrongLeader - cmd_resp.LeaderId = s.Rf.GetLeaderId() - return cmd_resp, nil + cmdResp.ErrMsg = "is not leader" + cmdResp.ErrCode = common.ErrCodeWrongLeader + cmdResp.LeaderId = s.Rf.GetLeaderId() + return cmdResp, nil } s.mu.Lock() @@ -116,13 +116,13 @@ func (s *MetaServer) DoConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.C select { case res := <-ch: - cmd_resp.Config = res.Config - cmd_resp.ErrMsg = res.ErrMsg - cmd_resp.ErrCode = common.ErrCodeNoErr + cmdResp.Config = res.Config + cmdResp.ErrMsg = res.ErrMsg + cmdResp.ErrCode = common.ErrCodeNoErr case <-time.After(ExecTimeout): - cmd_resp.ErrMsg = "server exec timeout" - cmd_resp.ErrCode = common.ErrCodeExecTimeout - return cmd_resp, errors.New("ExecTimeout") + cmdResp.ErrMsg = "server exec timeout" + cmdResp.ErrCode = common.ErrCodeExecTimeout + return cmdResp, errors.New("ExecTimeout") } go func() { @@ -131,7 +131,7 @@ func (s *MetaServer) DoConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.C s.mu.Unlock() }() - return cmd_resp, nil + return cmdResp, nil } func (s *MetaServer) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) { diff --git a/metaserver/metaserver_test.go b/metaserver/metaserver_test.go index c6e0e8b3..41062e8a 100644 --- a/metaserver/metaserver_test.go +++ b/metaserver/metaserver_test.go @@ -33,22 +33,22 @@ import ( ) func TestRangeArr(t *testing.T) { - var new_buckets [common.NBuckets]int - new_buckets[0] = 2 - for k, v := range new_buckets { + var newBuckets [common.NBuckets]int + newBuckets[0] = 2 + for k, v := range newBuckets { t.Logf("k -> %d, v -> %d", k, v) } } func TestAddGroups(t *testing.T) { - new_db_eng, err := storage_eng.MakeLevelDBKvStore("./conf_data/" + "/test") + newDbEng, err := storage_eng.MakeLevelDBKvStore("./conf_data/" + "/test") if err != nil { raftcore.PrintDebugLog("boot storage engine err!") panic(err) } - mem_conf_stm := NewMemConfigStm(new_db_eng) + memConfStm := NewMemConfigStm(newDbEng) for i := 0; i < 1000; i++ { - conf, _ := mem_conf_stm.Query(-1) + conf, _ := memConfStm.Query(-1) t.Logf("%v %d", conf, i) } } diff --git a/metaserver/metastm.go b/metaserver/metastm.go index c0a2f667..68b2f6ed 100644 --- a/metaserver/metastm.go +++ b/metaserver/metastm.go @@ -40,7 +40,7 @@ const CUR_VERSION_KEY = "CUR_CONF_VERSION" type ConfigStm interface { Join(groups map[int][]string) error Leave(gids []int) error - Move(bucket_id, gid int) error + Move(bucketID, gID int) error Query(num int) (Config, error) } @@ -52,7 +52,7 @@ type MemConfigStm struct { func NewMemConfigStm(dbEng storage.KvStore) *MemConfigStm { // check if has default conf _, err := dbEng.Get(CF_PREFIX + strconv.Itoa(0)) - conf_stm := &MemConfigStm{dbEng: dbEng, curConfVersion: 0} + confStm := &MemConfigStm{dbEng: dbEng, curConfVersion: 0} if err != nil { defaultConfig := DefaultConfig() defaultConfigBytes, err := json.Marshal(defaultConfig) @@ -61,105 +61,105 @@ func NewMemConfigStm(dbEng storage.KvStore) *MemConfigStm { } // init conf logger.ELogger().Sugar().Debugf("init conf -> " + string(defaultConfigBytes)) - if err := conf_stm.dbEng.Put(CF_PREFIX+strconv.Itoa(0), string(defaultConfigBytes)); err != nil { + if err := confStm.dbEng.Put(CF_PREFIX+strconv.Itoa(0), string(defaultConfigBytes)); err != nil { panic(err) } - if err := conf_stm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(conf_stm.curConfVersion)); err != nil { + if err := confStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(confStm.curConfVersion)); err != nil { panic(err) } - return conf_stm + return confStm } version_str, err := dbEng.Get(CUR_VERSION_KEY) if err != nil { panic(err) } version_int, _ := strconv.Atoi(version_str) - conf_stm.curConfVersion = version_int - return conf_stm + confStm.curConfVersion = version_int + return confStm } func (cfStm *MemConfigStm) Join(groups map[int][]string) error { - conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion)) + confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion)) if err != nil { return err } - last_conf := &Config{} - json.Unmarshal([]byte(conf_bytes), last_conf) - new_config := Config{cfStm.curConfVersion + 1, last_conf.Buckets, deepCopy(last_conf.Groups)} + lastConf := &Config{} + json.Unmarshal([]byte(confBytes), lastConf) + newConfig := Config{cfStm.curConfVersion + 1, lastConf.Buckets, deepCopy(lastConf.Groups)} for gid, servers := range groups { - if _, ok := new_config.Groups[gid]; !ok { + if _, ok := newConfig.Groups[gid]; !ok { newSvrs := make([]string, len(servers)) copy(newSvrs, servers) - new_config.Groups[gid] = newSvrs + newConfig.Groups[gid] = newSvrs } } - s2g := new_config.GetGroup2Buckets() + s2g := newConfig.GetGroup2Buckets() var new_buckets [common.NBuckets]int for gid, buckets := range s2g { for _, bid := range buckets { new_buckets[bid] = gid } } - new_config.Buckets = new_buckets - new_config_bytes, _ := json.Marshal(new_config) + newConfig.Buckets = new_buckets + newConfigBytes, _ := json.Marshal(newConfig) cfStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(cfStm.curConfVersion+1)) - cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(new_config_bytes)) + cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(newConfigBytes)) cfStm.curConfVersion += 1 return nil } func (cfStm *MemConfigStm) Leave(gids []int) error { - conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion)) + confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion)) if err != nil { return err } - last_conf := &Config{} - json.Unmarshal([]byte(conf_bytes), last_conf) - new_conf := Config{cfStm.curConfVersion + 1, last_conf.Buckets, deepCopy(last_conf.Groups)} + lastConf := &Config{} + json.Unmarshal([]byte(confBytes), lastConf) + newConf := Config{cfStm.curConfVersion + 1, lastConf.Buckets, deepCopy(lastConf.Groups)} for _, gid := range gids { - delete(new_conf.Groups, gid) + delete(newConf.Groups, gid) } var newBuckets [common.NBuckets]int - new_conf.Buckets = newBuckets - new_config_bytes, _ := json.Marshal(new_conf) + newConf.Buckets = newBuckets + newConfigBytes, _ := json.Marshal(newConf) cfStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(cfStm.curConfVersion+1)) - cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(new_config_bytes)) + cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(newConfigBytes)) cfStm.curConfVersion += 1 return nil } func (cfStm *MemConfigStm) Move(bid, gid int) error { - conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion)) + confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion)) if err != nil { return err } - last_conf := &Config{} - json.Unmarshal([]byte(conf_bytes), last_conf) - new_conf := Config{cfStm.curConfVersion + 1, last_conf.Buckets, deepCopy(last_conf.Groups)} - new_conf.Buckets[bid] = gid - new_config_bytes, _ := json.Marshal(new_conf) + lastConf := &Config{} + json.Unmarshal([]byte(confBytes), lastConf) + newConf := Config{cfStm.curConfVersion + 1, lastConf.Buckets, deepCopy(lastConf.Groups)} + newConf.Buckets[bid] = gid + newConfigBytes, _ := json.Marshal(newConf) cfStm.dbEng.Put(CUR_VERSION_KEY, strconv.Itoa(cfStm.curConfVersion+1)) - cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(new_config_bytes)) + cfStm.dbEng.Put(CF_PREFIX+strconv.Itoa(cfStm.curConfVersion+1), string(newConfigBytes)) cfStm.curConfVersion += 1 return nil } func (cfStm *MemConfigStm) Query(version int) (Config, error) { if version < 0 || version >= cfStm.curConfVersion { - last_conf := &Config{} + lastConf := &Config{} logger.ELogger().Sugar().Debugf("query cur version -> " + strconv.Itoa(cfStm.curConfVersion)) confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(cfStm.curConfVersion)) if err != nil { return DefaultConfig(), err } - json.Unmarshal([]byte(confBytes), last_conf) - return *last_conf, nil + json.Unmarshal([]byte(confBytes), lastConf) + return *lastConf, nil } - conf_bytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(version)) + confBytes, err := cfStm.dbEng.Get(CF_PREFIX + strconv.Itoa(version)) if err != nil { return DefaultConfig(), err } - spec_conf := &Config{} - json.Unmarshal([]byte(conf_bytes), spec_conf) - return *spec_conf, nil + specConf := &Config{} + json.Unmarshal([]byte(confBytes), specConf) + return *specConf, nil } diff --git a/raftcore/raft.go b/raftcore/raft.go index 0b5199ad..17cc1889 100644 --- a/raftcore/raft.go +++ b/raftcore/raft.go @@ -106,10 +106,10 @@ func MakeRaft(peers []*RaftPeerNode, me int64, newdbEng storage_eng.KvStore, app } rf.curTerm, rf.votedFor = rf.logs.ReadRaftState() rf.applyCond = sync.NewCond(&rf.mu) - last_log := rf.logs.GetLast() + lastLog := rf.logs.GetLast() for _, peer := range peers { logger.ELogger().Sugar().Debugf("peer addr %s id %d", peer.addr, peer.id) - rf.matchIdx[peer.id], rf.nextIdx[peer.id] = 0, int(last_log.Index+1) + rf.matchIdx[peer.id], rf.nextIdx[peer.id] = 0, int(lastLog.Index+1) if int64(peer.id) != me { rf.replicatorCond[peer.id] = sync.NewCond(&sync.Mutex{}) go rf.Replicator(peer) @@ -188,9 +188,9 @@ func (rf *Raft) HandleRequestVote(req *pb.RequestVoteRequest, resp *pb.RequestVo rf.curTerm, rf.votedFor = req.Term, -1 } - last_log := rf.logs.GetLast() + lastLog := rf.logs.GetLast() - if req.LastLogTerm < int64(last_log.Term) || (req.LastLogTerm == int64(last_log.Term) && req.LastLogIndex < last_log.Index) { + if req.LastLogTerm < int64(lastLog.Term) || (req.LastLogTerm == int64(lastLog.Term) && req.LastLogIndex < lastLog.Index) { resp.Term, resp.VoteGranted = rf.curTerm, false return } @@ -240,11 +240,11 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen if !rf.MatchLog(req.PrevLogTerm, req.PrevLogIndex) { resp.Term = rf.curTerm resp.Success = false - last_index := rf.logs.GetLast().Index - if last_index < req.PrevLogIndex { - logger.ELogger().Sugar().Warnf("log confict with term %d, index %d", -1, last_index+1) + lastIndex := rf.logs.GetLast().Index + if lastIndex < req.PrevLogIndex { + logger.ELogger().Sugar().Warnf("log confict with term %d, index %d", -1, lastIndex+1) resp.ConflictTerm = -1 - resp.ConflictIndex = last_index + 1 + resp.ConflictIndex = lastIndex + 1 } else { first_index := rf.logs.GetFirst().Index resp.ConflictTerm = int64(rf.logs.GetEntry(req.PrevLogIndex).Term) @@ -257,10 +257,10 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen return } - first_index := rf.logs.GetFirst().Index + firstIndex := rf.logs.GetFirst().Index for index, entry := range req.Entries { - if int(entry.Index-first_index) >= rf.logs.LogItemCount() || rf.logs.GetEntry(entry.Index).Term != entry.Term { - rf.logs.EraseAfter(entry.Index-first_index, true) + if int(entry.Index-firstIndex) >= rf.logs.LogItemCount() || rf.logs.GetEntry(entry.Index).Term != entry.Term { + rf.logs.EraseAfter(entry.Index-firstIndex, true) for _, newEnt := range req.Entries[index:] { rf.logs.Append(newEnt) } @@ -517,72 +517,77 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) { rf.mu.RUnlock() return } - prev_log_index := uint64(rf.nextIdx[peer.id] - 1) - logger.ELogger().Sugar().Debugf("leader prev log index %d", prev_log_index) - if prev_log_index < uint64(rf.logs.GetFirst().Index) { - first_log := rf.logs.GetFirst() + prevLogIndex := uint64(rf.nextIdx[peer.id] - 1) + logger.ELogger().Sugar().Debugf("leader prev log index %d", prevLogIndex) + if prevLogIndex < uint64(rf.logs.GetFirst().Index) { + firstLog := rf.logs.GetFirst() + + // snapShotContext, err := rf.ReadSnapshot() + // if err != nil { + // logger.ELogger().Sugar().Errorf("read snapshot data error %v", err) + // } - // TODO: send kv leveldb snapshot - snap_shot_req := &pb.InstallSnapshotRequest{ + snapShotReq := &pb.InstallSnapshotRequest{ Term: rf.curTerm, LeaderId: int64(rf.id), - LastIncludedIndex: first_log.Index, - LastIncludedTerm: int64(first_log.Term), + LastIncludedIndex: firstLog.Index, + LastIncludedTerm: int64(firstLog.Term), + // Data: snapShotContext, } rf.mu.RUnlock() - logger.ELogger().Sugar().Debugf("send snapshot to %s with %s", peer.addr, snap_shot_req.String()) + logger.ELogger().Sugar().Debugf("send snapshot to %s with %s", peer.addr, snapShotReq.String()) - snapshot_resp, err := (*peer.raftServiceCli).Snapshot(context.Background(), snap_shot_req) + snapshotResp, err := (*peer.raftServiceCli).Snapshot(context.Background(), snapShotReq) if err != nil { logger.ELogger().Sugar().Errorf("send snapshot to %s failed %v", peer.addr, err.Error()) } rf.mu.Lock() - logger.ELogger().Sugar().Debugf("send snapshot to %s with resp %s", peer.addr, snapshot_resp.String()) + logger.ELogger().Sugar().Debugf("send snapshot to %s with resp %s", peer.addr, snapshotResp.String()) - if snapshot_resp != nil && rf.role == NodeRoleLeader && - rf.curTerm == snap_shot_req.Term && snapshot_resp.Term > rf.curTerm { + if snapshotResp != nil && rf.role == NodeRoleLeader && + rf.curTerm == snapShotReq.Term && snapshotResp.Term > rf.curTerm { rf.SwitchRaftNodeRole(NodeRoleFollower) - rf.curTerm = snapshot_resp.Term + rf.curTerm = snapshotResp.Term rf.votedFor = -1 rf.PersistRaftState() rf.mu.Unlock() return } - logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snap_shot_req.LastIncludedIndex) - rf.matchIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) - rf.nextIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) + 1 + logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snapShotReq.LastIncludedIndex) + rf.matchIdx[peer.id] = int(snapShotReq.LastIncludedIndex) + rf.nextIdx[peer.id] = int(snapShotReq.LastIncludedIndex) + 1 rf.mu.Unlock() return } - first_index := rf.logs.GetFirst().Index - logger.ELogger().Sugar().Debugf("first log index %d", first_index) - new_ents, _ := rf.logs.EraseBefore(int64(prev_log_index)+1, false) + firstIndex := rf.logs.GetFirst().Index + logger.ELogger().Sugar().Debugf("first log index %d", firstIndex) + new_ents, _ := rf.logs.EraseBefore(int64(prevLogIndex)+1, false) entries := make([]*pb.Entry, len(new_ents)) copy(entries, new_ents) - append_ent_req := &pb.AppendEntriesRequest{ + appendEntReq := &pb.AppendEntriesRequest{ Term: rf.curTerm, LeaderId: int64(rf.id), - PrevLogIndex: int64(prev_log_index), - PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index)).Term), + PrevLogIndex: int64(prevLogIndex), + PrevLogTerm: int64(rf.logs.GetEntry(int64(prevLogIndex)).Term), Entries: entries, LeaderCommit: rf.commitIdx, } rf.mu.RUnlock() // send empty ae to peers - resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), append_ent_req) + resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), appendEntReq) if err != nil { logger.ELogger().Sugar().Errorf("send append entries to %s failed %v\n", peer.addr, err.Error()) } - if rf.role == NodeRoleLeader && rf.curTerm == append_ent_req.Term && resp != nil && resp.Success { + if rf.role == NodeRoleLeader && rf.curTerm == appendEntReq.Term && resp != nil && resp.Success { // deal with appendRnt resp logger.ELogger().Sugar().Debugf("send heart beat to %s success", peer.addr) - rf.matchIdx[peer.id] = int(append_ent_req.PrevLogIndex) + len(append_ent_req.Entries) + rf.matchIdx[peer.id] = int(appendEntReq.PrevLogIndex) + len(appendEntReq.Entries) rf.nextIdx[peer.id] = rf.matchIdx[peer.id] + 1 rf.advanceCommitIndexForLeader() return @@ -600,7 +605,7 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) { if resp.Term == rf.curTerm { rf.nextIdx[peer.id] = int(resp.ConflictIndex) if resp.ConflictTerm != -1 { - for i := append_ent_req.PrevLogIndex; i >= int64(first_index); i-- { + for i := appendEntReq.PrevLogIndex; i >= int64(firstIndex); i-- { if rf.logs.GetEntry(i).Term == uint64(resp.ConflictTerm) { rf.nextIdx[peer.id] = int(i + 1) break @@ -620,10 +625,10 @@ func (rf *Raft) Applier() { rf.applyCond.Wait() } - commit_index, last_applied := rf.commitIdx, rf.lastApplied - entries := make([]*pb.Entry, commit_index-last_applied) - copy(entries, rf.logs.GetRange(last_applied+1, commit_index)) - logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.id, rf.lastApplied, commit_index, rf.curTerm) + commitIndex, lastApplied := rf.commitIdx, rf.lastApplied + entries := make([]*pb.Entry, commitIndex-lastApplied) + copy(entries, rf.logs.GetRange(lastApplied+1, commitIndex)) + logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.id, rf.lastApplied, commitIndex, rf.curTerm) rf.mu.Unlock() for _, entry := range entries { @@ -636,25 +641,25 @@ func (rf *Raft) Applier() { } rf.mu.Lock() - rf.lastApplied = int64(Max(int(rf.lastApplied), int(commit_index))) + rf.lastApplied = int64(Max(int(rf.lastApplied), int(commitIndex))) rf.mu.Unlock() } } -func (rf *Raft) Snapshot(snap_idx uint64, snapshotContext []byte) error { +func (rf *Raft) Snapshot(snapIdx uint64, snapshotContext []byte) error { rf.mu.Lock() defer rf.mu.Unlock() rf.isSnapshoting = true - if snap_idx <= rf.logs.GetFirstLogId() { + if snapIdx <= rf.logs.GetFirstLogId() { rf.isSnapshoting = false return errors.New("ety index is larger than the first log index") } - _, err := rf.logs.EraseBefore(int64(snap_idx), true) + _, err := rf.logs.EraseBefore(int64(snapIdx), true) if err != nil { rf.isSnapshoting = false return err } - if err := rf.logs.ResetFirstLogEntry(rf.curTerm, int64(snap_idx)); err != nil { + if err := rf.logs.ResetFirstLogEntry(rf.curTerm, int64(snapIdx)); err != nil { rf.isSnapshoting = false return err } diff --git a/raftcore/raft_peer_node.go b/raftcore/raft_peer_node.go index 468f2735..bc876b5b 100644 --- a/raftcore/raft_peer_node.go +++ b/raftcore/raft_peer_node.go @@ -52,12 +52,12 @@ func MakeRaftPeerNode(addr string, id uint64) *RaftPeerNode { } conns := []*grpc.ClientConn{} conns = append(conns, conn) - rpc_client := raftpb.NewRaftServiceClient(conn) + rpcClient := raftpb.NewRaftServiceClient(conn) return &RaftPeerNode{ id: id, addr: addr, conns: conns, - raftServiceCli: &rpc_client, + raftServiceCli: &rpcClient, } } diff --git a/raftcore/raft_persistent_log.go b/raftcore/raft_persistent_log.go index 390addb4..b48ac311 100644 --- a/raftcore/raft_persistent_log.go +++ b/raftcore/raft_persistent_log.go @@ -55,9 +55,9 @@ func MakePersistRaftLog(newdbEng storage_eng.KvStore) *RaftLog { _, readBootstrapStateErr := newdbEng.GetBytesValue(BootstrapStateKey) if err != nil && readBootstrapStateErr != nil { logger.ELogger().Sugar().Debugf("init raft log state") - emp_ent := &pb.Entry{} - emp_ent_encode := EncodeEntry(emp_ent) - if err := newdbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), emp_ent_encode); err != nil { + empEnt := &pb.Entry{} + empEntEncode := EncodeEntry(empEnt) + if err := newdbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empEntEncode); err != nil { panic(err.Error()) } if err := newdbEng.PutBytesKv(BootstrapStateKey, []byte{}); err != nil { @@ -69,13 +69,13 @@ func MakePersistRaftLog(newdbEng storage_eng.KvStore) *RaftLog { if err != nil { panic(err) } - last_idx := binary.BigEndian.Uint64(lidkBytes[len(RAFTLOG_PREFIX):]) + lastIdx := binary.BigEndian.Uint64(lidkBytes[len(RAFTLOG_PREFIX):]) fidkBytes, _, err := newdbEng.SeekPrefixFirst(RAFTLOG_PREFIX) if err != nil { panic(err) } - first_idx := binary.BigEndian.Uint64(fidkBytes[len(RAFTLOG_PREFIX):]) - return &RaftLog{dbEng: newdbEng, lastIdx: last_idx, firstIdx: first_idx} + firstIdx := binary.BigEndian.Uint64(fidkBytes[len(RAFTLOG_PREFIX):]) + return &RaftLog{dbEng: newdbEng, lastIdx: lastIdx, firstIdx: firstIdx} } // PersistRaftState Persistent storage raft state @@ -84,22 +84,22 @@ func MakePersistRaftLog(newdbEng storage_eng.KvStore) *RaftLog { func (rfLog *RaftLog) PersistRaftState(curTerm int64, votedFor int64) { rfLog.mu.Lock() defer rfLog.mu.Unlock() - rf_state := &RaftPersistenState{ + rfState := &RaftPersistenState{ CurTerm: curTerm, VotedFor: votedFor, } - rfLog.dbEng.PutBytesKv(RAFT_STATE_KEY, EncodeRaftState(rf_state)) + rfLog.dbEng.PutBytesKv(RAFT_STATE_KEY, EncodeRaftState(rfState)) } // ReadRaftState // read the persist curTerm, votedFor for node from storage engine func (rfLog *RaftLog) ReadRaftState() (curTerm int64, votedFor int64) { - rf_bytes, err := rfLog.dbEng.GetBytesValue(RAFT_STATE_KEY) + rfBytes, err := rfLog.dbEng.GetBytesValue(RAFT_STATE_KEY) if err != nil { return 0, -1 } - rf_state := DecodeRaftState(rf_bytes) - return rf_state.CurTerm, rf_state.VotedFor + rfState := DecodeRaftState(rfBytes) + return rfState.CurTerm, rfState.VotedFor } // PersistSnapshot ... @@ -136,28 +136,28 @@ func (rfLog *RaftLog) GetLastLogId() uint64 { func (rfLog *RaftLog) SetEntFirstData(d []byte) error { rfLog.mu.Lock() defer rfLog.mu.Unlock() - first_idx := rfLog.GetFirstLogId() - encode_value, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(first_idx))) + firstIdx := rfLog.GetFirstLogId() + encodeValue, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(firstIdx))) if err != nil { - logger.ELogger().Sugar().Panicf("get log entry with id %d error!", first_idx) + logger.ELogger().Sugar().Panicf("get log entry with id %d error!", firstIdx) panic(err) } - ent := DecodeEntry(encode_value) - ent.Index = int64(first_idx) + ent := DecodeEntry(encodeValue) + ent.Index = int64(firstIdx) ent.Data = d - newent_encode := EncodeEntry(ent) - return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(first_idx), newent_encode) + newentEncode := EncodeEntry(ent) + return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(firstIdx), newentEncode) } func (rfLog *RaftLog) ResetFirstLogEntry(term int64, index int64) error { rfLog.mu.Lock() defer rfLog.mu.Unlock() - new_ent := &pb.Entry{} - new_ent.EntryType = pb.EntryType_EntryNormal - new_ent.Term = uint64(term) - new_ent.Index = index - newent_encode := EncodeEntry(new_ent) - if err := rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), newent_encode); err != nil { + newEnt := &pb.Entry{} + newEnt.EntryType = pb.EntryType_EntryNormal + newEnt.Term = uint64(term) + newEnt.Index = index + newentEncode := EncodeEntry(newEnt) + if err := rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), newentEncode); err != nil { return err } rfLog.firstIdx = uint64(index) @@ -176,9 +176,9 @@ func (rfLog *RaftLog) ReInitLogs() error { rfLog.firstIdx = 0 rfLog.lastIdx = 0 // add a empty - emp_ent := &pb.Entry{} - empent_encode := EncodeEntry(emp_ent) - return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empent_encode) + empEnt := &pb.Entry{} + empentEncode := EncodeEntry(empEnt) + return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empentEncode) } // GetFirst @@ -214,8 +214,8 @@ func (rfLog *RaftLog) LogItemCount() int { func (rfLog *RaftLog) Append(newEnt *pb.Entry) { rfLog.mu.Lock() defer rfLog.mu.Unlock() - newent_encode := EncodeEntry(newEnt) - rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(rfLog.lastIdx)+1), newent_encode) + newentEncode := EncodeEntry(newEnt) + rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(rfLog.lastIdx)+1), newentEncode) rfLog.lastIdx += 1 } @@ -226,10 +226,10 @@ func (rfLog *RaftLog) EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, erro rfLog.mu.Lock() defer rfLog.mu.Unlock() ents := []*pb.Entry{} - lastlog_id := rfLog.GetLastLogId() - firstlog_id := rfLog.GetFirstLogId() + lastlogID := rfLog.GetLastLogId() + firstlogID := rfLog.GetFirstLogId() if withDel { - for i := firstlog_id; i < uint64(logidx); i++ { + for i := firstlogID; i < uint64(logidx); i++ { if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(i)); err != nil { return ents, err } @@ -237,7 +237,7 @@ func (rfLog *RaftLog) EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, erro } rfLog.firstIdx = uint64(logidx) } - for i := logidx; i <= int64(lastlog_id); i++ { + for i := logidx; i <= int64(lastlogID); i++ { ents = append(ents, rfLog.GetEnt(i)) } return ents, nil @@ -249,7 +249,7 @@ func (rfLog *RaftLog) EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, erro func (rfLog *RaftLog) EraseAfter(logidx int64, withDel bool) []*pb.Entry { rfLog.mu.Lock() defer rfLog.mu.Unlock() - firstlog_id := rfLog.GetFirstLogId() + firstlogID := rfLog.GetFirstLogId() if withDel { for i := logidx; i <= int64(rfLog.GetLastLogId()); i++ { if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(uint64(i))); err != nil { @@ -259,7 +259,7 @@ func (rfLog *RaftLog) EraseAfter(logidx int64, withDel bool) []*pb.Entry { rfLog.lastIdx = uint64(logidx) - 1 } ents := []*pb.Entry{} - for i := firstlog_id; i < uint64(logidx); i++ { + for i := firstlogID; i < uint64(logidx); i++ { ents = append(ents, rfLog.GetEnt(int64(i))) } return ents @@ -287,23 +287,23 @@ func (rfLog *RaftLog) GetEntry(idx int64) *pb.Entry { } func (rfLog *RaftLog) GetEnt(logidx int64) *pb.Entry { - encode_value, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(logidx))) + encodeValue, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(logidx))) if err != nil { logger.ELogger().Sugar().Debugf("get log entry with id %d error!", logidx) panic(err) } - return DecodeEntry(encode_value) + return DecodeEntry(encodeValue) } // EncodeRaftLogKey // encode raft log key with perfix -> RAFTLOG_PREFIX func EncodeRaftLogKey(idx uint64) []byte { - var out_buf bytes.Buffer - out_buf.Write(RAFTLOG_PREFIX) + var outBuf bytes.Buffer + outBuf.Write(RAFTLOG_PREFIX) b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(idx)) - out_buf.Write(b) - return out_buf.Bytes() + outBuf.Write(b) + return outBuf.Bytes() } // DecodeRaftLogKey @@ -333,17 +333,17 @@ func DecodeEntry(in []byte) *pb.Entry { // EncodeRaftState // encode RaftPersistenState to bytes sequence func EncodeRaftState(rfState *RaftPersistenState) []byte { - var bytes_state bytes.Buffer - enc := gob.NewEncoder(&bytes_state) + var bytesState bytes.Buffer + enc := gob.NewEncoder(&bytesState) enc.Encode(rfState) - return bytes_state.Bytes() + return bytesState.Bytes() } // DecodeRaftState // decode RaftPersistenState from bytes sequence func DecodeRaftState(in []byte) *RaftPersistenState { dec := gob.NewDecoder(bytes.NewBuffer(in)) - rf_state := RaftPersistenState{} - dec.Decode(&rf_state) - return &rf_state + rfState := RaftPersistenState{} + dec.Decode(&rfState) + return &rfState } diff --git a/shardkvserver/shard_kvserver.go b/shardkvserver/shard_kvserver.go index 00f0d37b..135a317e 100644 --- a/shardkvserver/shard_kvserver.go +++ b/shardkvserver/shard_kvserver.go @@ -79,44 +79,44 @@ type MemSnapshotDB struct { // gid: the node's raft group id // configServerAddr: config server addr (leader addr, need to optimized into config server peer map) func MakeShardKVServer(peerMaps map[int]string, nodeId int64, gid int, configServerAddrs string) *ShardKV { - client_ends := []*raftcore.RaftPeerNode{} + clientEnds := []*raftcore.RaftPeerNode{} for id, addr := range peerMaps { - new_end := raftcore.MakeRaftPeerNode(addr, uint64(id)) - client_ends = append(client_ends, new_end) + newEnd := raftcore.MakeRaftPeerNode(addr, uint64(id)) + clientEnds = append(clientEnds, newEnd) } - new_apply_ch := make(chan *pb.ApplyMsg) + newApplyCh := make(chan *pb.ApplyMsg) - log_db_eng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId))) - new_rf := raftcore.MakeRaft(client_ends, nodeId, log_db_eng, new_apply_ch, 50, 150) - newdb_eng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId))) + logDbEng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId))) + newRf := raftcore.MakeRaft(clientEnds, nodeId, logDbEng, newApplyCh, 50, 150) + newdbEng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId))) - shard_kv := &ShardKV{ + shardKv := &ShardKV{ dead: 0, - rf: new_rf, - applyCh: new_apply_ch, + rf: newRf, + applyCh: newApplyCh, gid_: gid, cvCli: metaserver.MakeMetaSvrClient(common.UN_UNSED_TID, strings.Split(configServerAddrs, ",")), lastApplied: 0, curConfig: metaserver.DefaultConfig(), lastConfig: metaserver.DefaultConfig(), stm: make(map[int]*Bucket), - dbEng: newdb_eng, + dbEng: newdbEng, notifyChans: map[int]chan *pb.CommandResponse{}, } - shard_kv.initStm(shard_kv.dbEng) + shardKv.initStm(shardKv.dbEng) - shard_kv.curConfig = *shard_kv.cvCli.Query(-1) - shard_kv.lastConfig = *shard_kv.cvCli.Query(-1) + shardKv.curConfig = *shardKv.cvCli.Query(-1) + shardKv.lastConfig = *shardKv.cvCli.Query(-1) - shard_kv.stopApplyCh = make(chan interface{}) + shardKv.stopApplyCh = make(chan interface{}) // start applier - go shard_kv.ApplingToStm(shard_kv.stopApplyCh) + go shardKv.ApplingToStm(shardKv.stopApplyCh) - go shard_kv.ConfigAction() + go shardKv.ConfigAction() - return shard_kv + return shardKv } // CloseApply close the stopApplyCh to stop commit entries apply @@ -135,36 +135,36 @@ func (s *ShardKV) ConfigAction() { logger.ELogger().Sugar().Debugf("timeout into config action") s.mu.RLock() - can_perform_next_conf := true + canPerformNextConf := true for _, bucket := range s.stm { if bucket.Status != Runing { - can_perform_next_conf = false + canPerformNextConf = false logger.ELogger().Sugar().Errorf("cano't perform next conf") break } } - if can_perform_next_conf { + if canPerformNextConf { logger.ELogger().Sugar().Debug("can perform next conf") } - cur_conf_version := s.curConfig.Version + curConfVersion := s.curConfig.Version s.mu.RUnlock() - if can_perform_next_conf { - next_config := s.cvCli.Query(int64(cur_conf_version) + 1) - if next_config == nil { + if canPerformNextConf { + nextConfig := s.cvCli.Query(int64(curConfVersion) + 1) + if nextConfig == nil { continue } - next_cf_bytes, _ := json.Marshal(next_config) - cur_cf_bytes, _ := json.Marshal(s.curConfig) - logger.ELogger().Sugar().Debugf("next config %s ", string(next_cf_bytes)) - logger.ELogger().Sugar().Debugf("cur config %s ", string(cur_cf_bytes)) - if next_config.Version == cur_conf_version+1 { + nextCfBytes, _ := json.Marshal(nextConfig) + curCfBytes, _ := json.Marshal(s.curConfig) + logger.ELogger().Sugar().Debugf("next config %s ", string(nextCfBytes)) + logger.ELogger().Sugar().Debugf("cur config %s ", string(curCfBytes)) + if nextConfig.Version == curConfVersion+1 { req := &pb.CommandRequest{} - next_cf_bytes, _ := json.Marshal(next_config) - logger.ELogger().Sugar().Debugf("can perform next conf %s ", string(next_cf_bytes)) - req.Context = next_cf_bytes + nextCfBytes, _ := json.Marshal(nextConfig) + logger.ELogger().Sugar().Debugf("can perform next conf %s ", string(nextCfBytes)) + req.Context = nextCfBytes req.OpType = pb.OpType_OpConfigChange - req_bytes, _ := json.Marshal(req) - idx, _, isLeader := s.rf.Propose(req_bytes) + reqBytes, _ := json.Marshal(req) + idx, _, isLeader := s.rf.Propose(reqBytes) if !isLeader { return } @@ -212,11 +212,11 @@ func (s *ShardKV) IsKilled() bool { // DoCommand do client put get command func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.CommandResponse, error) { - cmd_resp := &pb.CommandResponse{} + cmdResp := &pb.CommandResponse{} if !s.CanServe(common.Key2BucketID(req.Key)) { - cmd_resp.ErrCode = common.ErrCodeWrongGroup - return cmd_resp, nil + cmdResp.ErrCode = common.ErrCodeWrongGroup + return cmdResp, nil } req_bytes, err := json.Marshal(req) if err != nil { @@ -225,9 +225,9 @@ func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.Co // propose to raft idx, _, isLeader := s.rf.Propose(req_bytes) if !isLeader { - cmd_resp.ErrCode = common.ErrCodeWrongLeader - cmd_resp.LeaderId = s.GetRf().GetLeaderId() - return cmd_resp, nil + cmdResp.ErrCode = common.ErrCodeWrongLeader + cmdResp.LeaderId = s.GetRf().GetLeaderId() + return cmdResp, nil } s.mu.Lock() @@ -237,11 +237,11 @@ func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.Co select { case res := <-ch: if res != nil { - cmd_resp.ErrCode = common.ErrCodeNoErr - cmd_resp.Value = res.Value + cmdResp.ErrCode = common.ErrCodeNoErr + cmdResp.Value = res.Value } case <-time.After(metaserver.ExecTimeout): - return cmd_resp, errors.New("ExecTimeout") + return cmdResp, errors.New("ExecTimeout") } go func() { @@ -250,7 +250,7 @@ func (s *ShardKV) DoCommand(ctx context.Context, req *pb.CommandRequest) (*pb.Co s.mu.Unlock() }() - return cmd_resp, nil + return cmdResp, nil } // ApplingToStm apply the commit operation to state machine @@ -280,49 +280,49 @@ func (s *ShardKV) ApplingToStm(done <-chan interface{}) { s.lastApplied = int(appliedMsg.CommandIndex) logger.ELogger().Sugar().Debugf("shard_kvserver last applied %d", s.lastApplied) - cmd_resp := &pb.CommandResponse{} + cmdResp := &pb.CommandResponse{} value := "" var err error switch req.OpType { // Normal Op case pb.OpType_OpPut: - bucket_id := common.Key2BucketID(req.Key) - if s.CanServe(bucket_id) { - logger.ELogger().Sugar().Debug("WRITE put " + req.Key + " value " + req.Value + " to bucket " + strconv.Itoa(bucket_id)) - s.stm[bucket_id].Put(req.Key, req.Value) + bucketID := common.Key2BucketID(req.Key) + if s.CanServe(bucketID) { + logger.ELogger().Sugar().Debug("WRITE put " + req.Key + " value " + req.Value + " to bucket " + strconv.Itoa(bucketID)) + s.stm[bucketID].Put(req.Key, req.Value) } case pb.OpType_OpAppend: - bucket_id := common.Key2BucketID(req.Key) - if s.CanServe(bucket_id) { - s.stm[bucket_id].Append(req.Key, req.Value) + bucketID := common.Key2BucketID(req.Key) + if s.CanServe(bucketID) { + s.stm[bucketID].Append(req.Key, req.Value) } case pb.OpType_OpGet: - bucket_id := common.Key2BucketID(req.Key) - if s.CanServe(bucket_id) { - value, err = s.stm[bucket_id].Get(req.Key) - logger.ELogger().Sugar().Debug("get " + req.Key + " value " + value + " from bucket " + strconv.Itoa(bucket_id)) + bucketID := common.Key2BucketID(req.Key) + if s.CanServe(bucketID) { + value, err = s.stm[bucketID].Get(req.Key) + logger.ELogger().Sugar().Debug("get " + req.Key + " value " + value + " from bucket " + strconv.Itoa(bucketID)) } - cmd_resp.Value = value + cmdResp.Value = value case pb.OpType_OpConfigChange: - next_config := &metaserver.Config{} - json.Unmarshal(req.Context, next_config) - if next_config.Version == s.curConfig.Version+1 { + nextConfig := &metaserver.Config{} + json.Unmarshal(req.Context, nextConfig) + if nextConfig.Version == s.curConfig.Version+1 { for i := 0; i < common.NBuckets; i++ { - if s.curConfig.Buckets[i] != s.gid_ && next_config.Buckets[i] == s.gid_ { + if s.curConfig.Buckets[i] != s.gid_ && nextConfig.Buckets[i] == s.gid_ { gid := s.curConfig.Buckets[i] if gid != 0 { s.stm[i].Status = Runing } } - if s.curConfig.Buckets[i] == s.gid_ && next_config.Buckets[i] != s.gid_ { - gid := next_config.Buckets[i] + if s.curConfig.Buckets[i] == s.gid_ && nextConfig.Buckets[i] != s.gid_ { + gid := nextConfig.Buckets[i] if gid != 0 { s.stm[i].Status = Stopped } } } s.lastConfig = s.curConfig - s.curConfig = *next_config + s.curConfig = *nextConfig cf_bytes, _ := json.Marshal(s.curConfig) logger.ELogger().Sugar().Debugf("applied config to server %s ", string(cf_bytes)) } @@ -351,7 +351,7 @@ func (s *ShardKV) ApplingToStm(done <-chan interface{}) { } ch := s.getNotifyChan(int(appliedMsg.CommandIndex)) - ch <- cmd_resp + ch <- cmdResp if _, isLeader := s.rf.GetState(); isLeader && s.GetRf().LogCount() > 500 { s.mu.Lock() @@ -444,9 +444,9 @@ func (s *ShardKV) Snapshot(ctx context.Context, req *pb.InstallSnapshotRequest) // rpc interface // DoBucketsOperation hanlde bucket data get, delete and insert func (s *ShardKV) DoBucketsOperation(ctx context.Context, req *pb.BucketOperationRequest) (*pb.BucketOperationResponse, error) { - op_resp := &pb.BucketOperationResponse{} + opResp := &pb.BucketOperationResponse{} if _, isLeader := s.rf.GetState(); !isLeader { - return op_resp, errors.New("ErrorWrongLeader") + return opResp, errors.New("ErrorWrongLeader") } switch req.BucketOpType { case pb.BucketOpType_OpGetData: @@ -454,21 +454,21 @@ func (s *ShardKV) DoBucketsOperation(ctx context.Context, req *pb.BucketOperatio s.mu.RLock() if s.curConfig.Version < int(req.ConfigVersion) { s.mu.RUnlock() - return op_resp, errors.New("ErrNotReady") + return opResp, errors.New("ErrNotReady") } - bucket_datas := &BucketDatasVo{} - bucket_datas.Datas = map[int]map[string]string{} + bucketDatas := &BucketDatasVo{} + bucketDatas.Datas = map[int]map[string]string{} for _, bucketID := range req.BucketIds { sDatas, err := s.stm[int(bucketID)].deepCopy(false) if err != nil { s.mu.RUnlock() - return op_resp, err + return opResp, err } - bucket_datas.Datas[int(bucketID)] = sDatas + bucketDatas.Datas[int(bucketID)] = sDatas } - buket_data_bytes, _ := json.Marshal(bucket_datas) - op_resp.BucketsDatas = buket_data_bytes - op_resp.ConfigVersion = req.ConfigVersion + buketDataBytes, _ := json.Marshal(bucketDatas) + opResp.BucketsDatas = buketDataBytes + opResp.ConfigVersion = req.ConfigVersion s.mu.RUnlock() } case pb.BucketOpType_OpDeleteData: @@ -476,18 +476,18 @@ func (s *ShardKV) DoBucketsOperation(ctx context.Context, req *pb.BucketOperatio s.mu.RLock() if int64(s.curConfig.Version) > req.ConfigVersion { s.mu.RUnlock() - return op_resp, nil + return opResp, nil } s.mu.RUnlock() - command_req := &pb.CommandRequest{} - bucket_op_req_bytes, _ := json.Marshal(req) - command_req.Context = bucket_op_req_bytes - command_req.OpType = pb.OpType_OpDeleteBuckets - command_req_bytes, _ := json.Marshal(command_req) + commandReq := &pb.CommandRequest{} + bucketOpReqBytes, _ := json.Marshal(req) + commandReq.Context = bucketOpReqBytes + commandReq.OpType = pb.OpType_OpDeleteBuckets + commandReqBytes, _ := json.Marshal(commandReq) // async - _, _, isLeader := s.rf.Propose(command_req_bytes) + _, _, isLeader := s.rf.Propose(commandReqBytes) if !isLeader { - return op_resp, nil + return opResp, nil } } case pb.BucketOpType_OpInsertData: @@ -495,20 +495,20 @@ func (s *ShardKV) DoBucketsOperation(ctx context.Context, req *pb.BucketOperatio s.mu.RLock() if int64(s.curConfig.Version) > req.ConfigVersion { s.mu.RUnlock() - return op_resp, nil + return opResp, nil } s.mu.RUnlock() - command_req := &pb.CommandRequest{} - bucket_op_req_bytes, _ := json.Marshal(req) - command_req.Context = bucket_op_req_bytes - command_req.OpType = pb.OpType_OpInsertBuckets - command_req_bytes, _ := json.Marshal(command_req) + commandReq := &pb.CommandRequest{} + bucketOpReqBytes, _ := json.Marshal(req) + commandReq.Context = bucketOpReqBytes + commandReq.OpType = pb.OpType_OpInsertBuckets + commandReqBytes, _ := json.Marshal(commandReq) // async - _, _, isLeader := s.rf.Propose(command_req_bytes) + _, _, isLeader := s.rf.Propose(commandReqBytes) if !isLeader { - return op_resp, nil + return opResp, nil } } } - return op_resp, nil + return opResp, nil }