Skip to content

Commit

Permalink
COMPLETE project2C & PASS
Browse files Browse the repository at this point in the history
  • Loading branch information
Metafora072 committed Jul 19, 2024
1 parent 28e1760 commit 1c88ea4
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 35 deletions.
44 changes: 44 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,36 @@ project2b:
$(GOTEST) ./kv/test_raftstore -run ^TestPersistPartition2B$ || true
$(GOTEST) ./kv/test_raftstore -run ^TestPersistPartitionUnreliable2B$ || true
$(TEST_CLEAN)
#project2b:
# $(TEST_CLEAN)
# $(GOTEST) -v ./kv/test_raftstore -run ^TestBasic2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestConcurrent2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestUnreliable2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestOnePartition2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestManyPartitionsOneClient2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestManyPartitionsManyClients2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistOneClient2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistConcurrent2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistConcurrentUnreliable2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistPartition2B$ | grep PASS || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistPartitionUnreliable2B$ | grep PASS || true
# $(TEST_CLEAN)
#project2b:
# $(TEST_CLEAN)
# $(GOTEST) -v ./kv/test_raftstore -run ^TestBasic2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestConcurrent2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestUnreliable2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestOnePartition2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestManyPartitionsOneClient2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestManyPartitionsManyClients2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistOneClient2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistConcurrent2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistConcurrentUnreliable2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistPartition2B$ | grep -E "PASS|ok|FAIL" || true
# $(GOTEST) -v ./kv/test_raftstore -run ^TestPersistPartitionUnreliable2B$ | grep -E "PASS|ok|FAIL" || true
# $(TEST_CLEAN)



project2c:
$(TEST_CLEAN)
Expand All @@ -94,6 +124,20 @@ project2c:
$(GOTEST) ./kv/test_raftstore -run ^TestSnapshotUnreliableRecoverConcurrentPartition2C$ || true
$(TEST_CLEAN)

project2c_SnapshotRecover:
$(TEST_CLEAN)
$(GOTEST) ./kv/test_raftstore -run ^TestSnapshotRecover2C$ || true
$(TEST_CLEAN)

project2c_TestSnapshotUnreliable5Times:
$(TEST_CLEAN)
$(GOTEST) ./kv/test_raftstore -run ^TestSnapshotUnreliable2C$ || true
$(GOTEST) ./kv/test_raftstore -run ^TestSnapshotUnreliable2C$ || true
$(GOTEST) ./kv/test_raftstore -run ^TestSnapshotUnreliable2C$ || true
$(GOTEST) ./kv/test_raftstore -run ^TestSnapshotUnreliable2C$ || true
$(GOTEST) ./kv/test_raftstore -run ^TestSnapshotUnreliable2C$ || true
$(TEST_CLEAN)

project3: project3a project3b project3c

project3a:
Expand Down
118 changes: 92 additions & 26 deletions kv/raftstore/peer_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/pingcap-incubator/tinykv/kv/raftstore/meta"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
pb "github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
"reflect"
"time"

"github.com/Connor1996/badger/y"
Expand Down Expand Up @@ -57,11 +58,29 @@ func (d *peerMsgHandler) HandleRaftReady() {
ready := d.RaftGroup.Ready()

// 调用 SaveReadyState 将 Ready 中需要持久化的内容保存到 badger
_, err := d.peerStorage.SaveReadyState(&ready)
applySnapResult, err := d.peerStorage.SaveReadyState(&ready)
if err != nil {
log.Panic(err)
}

if applySnapResult != nil { // 存在快照,则应用这个快照
if reflect.DeepEqual(applySnapResult.Region,applySnapResult.PrevRegion) == false {
d.peerStorage.SetRegion(applySnapResult.Region)

storeMeta := d.ctx.storeMeta

storeMeta.Lock()
storeMeta.regions[applySnapResult.Region.Id] = applySnapResult.Region
storeMeta.regionRanges.Delete(&regionItem{
region: applySnapResult.PrevRegion,
})
storeMeta.regionRanges.ReplaceOrInsert(&regionItem{
region: applySnapResult.Region,
})
storeMeta.Unlock()
}
}

// 调用 d.Send() 方法将 Ready 中的 Msg 发送出去
d.Send(d.ctx.trans, ready.Messages)

Expand Down Expand Up @@ -105,39 +124,44 @@ func (d *peerMsgHandler) HandleRaftReady() {
d.RaftGroup.Advance(ready)
}


func (d *peerMsgHandler) processCommittedEntry(entry *pb.Entry,writeBatch *engine_util.WriteBatch) *engine_util.WriteBatch {
//fmt.Println("processCommittedEntry called!")
//EntryType_EntryNormal (值为 0): 这种类型表示普通的日志条目,通常用于存储客户端的请求或命令。这些命令会被应用到状态机中,以保持集群的一致性。
//EntryType_EntryConfChange (值为 1): 这种类型表示配置变更日志条目,用于集群配置的更改,例如添加或删除节点。当这种类型的条目被提交时,Raft 节点会应用配置变更,以更新集群的成员信息。

if entry.EntryType == pb.EntryType_EntryConfChange { // 日志条目是配置变更条目
confChange := &pb.ConfChange{}

err := confChange.Unmarshal(entry.Data)
if err != nil {
log.Panic(err)
}
log.Infof("EntryType_EntryConfChange")
// Project3 TODO
//if entry.EntryType == pb.EntryType_EntryConfChange { // 日志条目是配置变更条目
// confChange := &pb.ConfChange{}
//
// err := confChange.Unmarshal(entry.Data)
// if err != nil {
// log.Panic(err)
// }
// log.Infof("EntryType_EntryConfChange")
//
// return d.processConfChange(entry,confChange,writeBatch)
//}

return d.processConfChange(entry,confChange,writeBatch)
}
// 反序列化 entry.Data 中的数据
request := &raft_cmdpb.RaftCmdRequest{}

err := request.Unmarshal(entry.Data)
if err != nil {
log.Panic(err)
}
// TODO
//if request.AdminRequest != nil {
// // return d.processAdminRequest(entry, requests, kvWB)
//} else {
// return d.processRequest(entry, request, writeBatch)
//}

return d.processRequest(entry, request, writeBatch)
if request.AdminRequest != nil {
return d.processAdminRequest(entry, request, writeBatch)
} else {
return d.processRequest(entry, request, writeBatch)
}


}



// processConfChange 处理配置变更类型的日志条目
func (d *peerMsgHandler) processConfChange(entry *pb.Entry,confChange *pb.ConfChange,writeBatch *engine_util.WriteBatch) *engine_util.WriteBatch {
// 反序列化 entry.Data 中的数据
Expand Down Expand Up @@ -191,6 +215,23 @@ func (d *peerMsgHandler) getPeerIndex(nodeId uint64) uint64 {
return uint64(len(d.peerStorage.region.Peers))
}

// processAdminRequest 处理 commit 的 Admin Request 类型 command
func (d *peerMsgHandler) processAdminRequest(entry *pb.Entry,request *raft_cmdpb.RaftCmdRequest,writeBatch *engine_util.WriteBatch) *engine_util.WriteBatch {
switch request.AdminRequest.CmdType {
case raft_cmdpb.AdminCmdType_InvalidAdmin:
break
case raft_cmdpb.AdminCmdType_CompactLog: // CompactLog 类型请求不需要将执行结果存储到 proposal 回调????
if request.AdminRequest.CompactLog.CompactIndex > d.peerStorage.applyState.TruncatedState.Index {
d.peerStorage.applyState.TruncatedState.Index = request.AdminRequest.CompactLog.CompactIndex
d.peerStorage.applyState.TruncatedState.Term = request.AdminRequest.CompactLog.CompactTerm
// 调度日志截断任务
d.ScheduleCompactLog(request.AdminRequest.CompactLog.CompactIndex)
}
// TODO other cases: AdminCmdType_ChangePeer、AdminCmdType_TransferLeader、AdminCmdType_Split
}

return writeBatch
}
// processRequest 处理 commit 的 Put/Get/Delete/Snap 类型 command
func (d *peerMsgHandler) processRequest(entry *pb.Entry,request *raft_cmdpb.RaftCmdRequest,writeBatch *engine_util.WriteBatch) *engine_util.WriteBatch {
raftCmdResponse := &raft_cmdpb.RaftCmdResponse{
Expand Down Expand Up @@ -287,7 +328,7 @@ func (d *peerMsgHandler) processRequest(entry *pb.Entry,request *raft_cmdpb.Raft
BindRespError(raftCmdResponse,err)
continue
}
// Get 和 Snap 请求需要先将结果写到 DB,否 则的话如果有多个 entry 同时被 apply,客户端无法及时看到写入的结果 ??
// Get 和 Snap 请求需要先将结果写到 DB,否则的话如果有多个 entry 同时被 apply,客户端无法及时看到写入的结果 ??
writeBatch.MustWriteToDB(d.peerStorage.Engines.Kv)
writeBatch = &engine_util.WriteBatch{}
/*
Expand All @@ -300,9 +341,8 @@ func (d *peerMsgHandler) processRequest(entry *pb.Entry,request *raft_cmdpb.Raft
*/
raftCmdResponse.Responses = append(raftCmdResponse.Responses, &raft_cmdpb.Response{
CmdType: raft_cmdpb.CmdType_Snap,
Snap: &raft_cmdpb.SnapResponse{Region: d.Region()},
Snap: &raft_cmdpb.SnapResponse{Region: d.Region()},
})
// Snap TODO
}
}

Expand All @@ -327,13 +367,25 @@ func (d *peerMsgHandler) processProposal(entry *pb.Entry, raftCmdResponse *raft_
}
curProposal.cb.Done(raftCmdResponse)
d.proposals = d.proposals[1:]
continue

}

return
}
//plen := len(d.proposals)
//for idx := 0; idx < plen; idx++ {
// curProposal := d.proposals[idx]
// if curProposal.term == entry.Term && curProposal.index == entry.Index {
// if curProposal.cb != nil {
// curProposal.cb.Txn = d.peerStorage.Engines.Kv.NewTransaction(false)
// }
// curProposal.cb.Done(raftCmdResponse)
// return
// }
//}
}


func (d *peerMsgHandler) HandleMsg(msg message.Msg) {
switch msg.Type {
case message.MsgTypeRaftMessage:
Expand Down Expand Up @@ -396,6 +448,9 @@ func (d *peerMsgHandler) preProposeRaftCommand(req *raft_cmdpb.RaftCmdRequest) e
return err
}




//将 client 的请求包装成 entry 传递给 raft 层
func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *message.Callback) {
err := d.preProposeRaftCommand(msg)
Expand All @@ -404,7 +459,7 @@ func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *
return
}
// Your Code Here (2B).
if msg.Requests != nil {
if msg.Requests != nil { // 普通请求
// 封装回调函数 callback
curProposal := &proposal{
index: d.RaftGroup.Raft.RaftLog.LastIndex() + 1,
Expand All @@ -425,8 +480,19 @@ func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *
if err != nil {
log.Panic(err)
}
} else {
// TODO
} else if msg.AdminRequest != nil { // 管理员请求
switch msg.AdminRequest.CmdType {
case raft_cmdpb.AdminCmdType_CompactLog: // 日志压缩需要提交到 raft 同步
marshalRes, err := msg.Marshal()
if err != nil {
log.Panic(err)
}
err = d.RaftGroup.Propose(marshalRes)
if err != nil {
log.Panic(err)
}
// TODO other cases
}
}

}
Expand Down Expand Up @@ -885,4 +951,4 @@ func newCompactLogRequest(regionID uint64, peer *metapb.Peer, compactIndex, comp
},
}
return req
}
}
53 changes: 47 additions & 6 deletions kv/raftstore/peer_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,48 @@ func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_ut
// and send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
// and ps.clearExtraData to delete stale data
// Your Code Here (2C).
// 这里需要做的事情包括:更新对等存储状态,如raftState和applyState等,
// 并通过ps.regionSched将RegionTaskApply任务发送给区域工作者,还请记住调用ps.clearMeta
// 和ps.clearExtraData删除过时数据

return nil, nil
// 删除过时数据
if ps.isInitialized() {
err := ps.clearMeta(kvWB,raftWB)
if err != nil {
log.Panic(err)
}
ps.clearExtraData(snapData.Region)
}

// 更新 peer_storage 的内存状态
ps.raftState.LastIndex = snapshot.Metadata.Index
ps.raftState.LastTerm = snapshot.Metadata.Term
ps.applyState.AppliedIndex = snapshot.Metadata.Index
ps.applyState.TruncatedState.Index = snapshot.Metadata.Index
ps.applyState.TruncatedState.Term = snapshot.Metadata.Term
ps.snapState.StateType = snap.SnapState_Applying

err := kvWB.SetMeta(meta.ApplyStateKey(ps.region.Id),ps.applyState)
if err != nil {
log.Panic(err)
}

// 发送 runner.RegionTaskApply 任务给 region worker,并等待处理完毕
ch := make(chan bool, 1)
ps.regionSched <- &runner.RegionTaskApply{
RegionId: ps.region.Id,
Notifier: ch,
SnapMeta: snapshot.Metadata,
StartKey: snapData.Region.GetStartKey(),
EndKey: snapData.Region.GetEndKey(),
}
<-ch
result := &ApplySnapResult{
PrevRegion: ps.region,
Region: snapData.Region,
}
meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal)
return result, nil

}

Expand All @@ -367,11 +407,11 @@ func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, erro
var applySnapResult *ApplySnapResult
// 通过 raft.isEmptySnap() 方法判断是否存在 Snapshot,如果有,则调用ApplySnapshot() 方法应用;
// Snap TODO
//if raft.IsEmptySnap(&ready.Snapshot) {
// kvWriteBatch := &engine_util.WriteBatch{}
// applySnapResult,err = ps.ApplySnapshot(&ready.Snapshot,kvWriteBatch,writeBatch)
// kvWriteBatch.MustWriteToDB(ps.Engines.Kv)
//}
if raft.IsEmptySnap(&ready.Snapshot) == false {
kvWriteBatch := &engine_util.WriteBatch{}
applySnapResult,err = ps.ApplySnapshot(&ready.Snapshot,kvWriteBatch,writeBatch)
kvWriteBatch.MustWriteToDB(ps.Engines.Kv)
}

// 调用 Append 方法将需要持久化的 Entry 保存到 writeBatch
err = ps.Append(ready.Entries,writeBatch)
Expand All @@ -394,6 +434,7 @@ func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, erro
// 通过 writeBatch.WriteToDB 和 kvWB.WriteToDB 进行原子的写入到存储引擎
writeBatch.MustWriteToDB(ps.Engines.Raft)


return applySnapResult, nil
}

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 1c88ea4

Please sign in to comment.