Skip to content

Commit

Permalink
Merge pull request #647 from kakao/avoid-report-ignore
Browse files Browse the repository at this point in the history
feat(metarepos): unseal reportCollector with LastCommittedVer - 1
  • Loading branch information
hungryjang authored Dec 28, 2023
2 parents 8732632 + c072e4d commit bb45410
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 1 deletion.
7 changes: 7 additions & 0 deletions internal/metarepos/dummy_storagenode_client_factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type DummyStorageNodeClient struct {

logStreamIDs []types.LogStreamID
knownVersion []types.Version
knownHWM []types.GLSN
uncommittedLLSNOffset []types.LLSN
uncommittedLLSNLength []uint64

Expand Down Expand Up @@ -169,6 +170,7 @@ func (fac *DummyStorageNodeClientFactory) getStorageNodeClient(_ context.Context
}

knownVersion := make([]types.Version, fac.nrLogStreams)
knownHWM := make([]types.GLSN, fac.nrLogStreams)

uncommittedLLSNOffset := make([]types.LLSN, fac.nrLogStreams)
for i := 0; i < fac.nrLogStreams; i++ {
Expand All @@ -182,6 +184,7 @@ func (fac *DummyStorageNodeClientFactory) getStorageNodeClient(_ context.Context
storageNodeID: snID,
logStreamIDs: lsIDs,
knownVersion: knownVersion,
knownHWM: knownHWM,
uncommittedLLSNOffset: uncommittedLLSNOffset,
uncommittedLLSNLength: uncommittedLLSNLength,
status: status,
Expand Down Expand Up @@ -280,6 +283,7 @@ func (r *DummyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error) {
u := snpb.LogStreamUncommitReport{
LogStreamID: lsID,
Version: r.knownVersion[i],
HighWatermark: r.knownHWM[i],
UncommittedLLSNOffset: r.uncommittedLLSNOffset[i],
UncommittedLLSNLength: r.uncommittedLLSNLength[i],
}
Expand Down Expand Up @@ -321,6 +325,7 @@ func (r *DummyStorageNodeClient) Commit(cr snpb.CommitRequest) error {
}

r.knownVersion[idx] = cr.CommitResult.Version
r.knownHWM[idx] = cr.CommitResult.HighWatermark

r.uncommittedLLSNOffset[idx] += types.LLSN(cr.CommitResult.CommittedGLSNLength)
r.uncommittedLLSNLength[idx] -= cr.CommitResult.CommittedGLSNLength
Expand Down Expand Up @@ -408,6 +413,7 @@ func (r *DummyStorageNodeClient) makeInvalid(idx int) {
defer r.mu.Unlock()

r.knownVersion[idx] = 0
r.knownHWM[idx] = 0
r.uncommittedLLSNOffset[idx] = 0
}

Expand Down Expand Up @@ -442,6 +448,7 @@ func (fac *DummyStorageNodeClientFactory) recoverRPC(snID types.StorageNodeID) {
storageNodeID: old.storageNodeID,
logStreamIDs: old.logStreamIDs,
knownVersion: old.knownVersion,
knownHWM: old.knownHWM,
uncommittedLLSNOffset: old.uncommittedLLSNOffset,
uncommittedLLSNLength: old.uncommittedLLSNLength,
status: DummyStorageNodeClientStatusRunning,
Expand Down
10 changes: 9 additions & 1 deletion internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,15 @@ func (mr *RaftMetadataRepository) applyUnseal(r *mrpb.Unseal, nodeIndex, request
return err
}

mr.reportCollector.Unseal(r.LogStreamID, mr.GetLastCommitVersion())
ver := mr.GetLastCommitVersion()
if ver > 0 {
// Let logstream know the last commit version.
// This tricks the committer into thinking that the last version
// that storagenode knows is last commit version - 1.
// So, committer delivers last commit result to storagenode.
ver = ver - 1
}
mr.reportCollector.Unseal(r.LogStreamID, ver)

return nil
}
Expand Down
92 changes: 92 additions & 0 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2973,6 +2973,98 @@ func TestMRTopicCatchup(t *testing.T) {
})
}

func TestMRCatchupUnsealedLogstream(t *testing.T) {
Convey("Given MR cluster", t, func(ctx C) {
nrRep := 1
nrTopic := 1
nrSN := 2
clus := newMetadataRepoCluster(1, nrRep, false)
Reset(func() {
clus.closeNoErrors(t)
})

So(clus.Start(), ShouldBeNil)
So(testutil.CompareWaitN(10, func() bool {
return clus.healthCheckAll()
}), ShouldBeTrue)

mr := clus.nodes[0]

// register SN & LS
err := clus.initDummyStorageNode(nrSN, nrTopic)
So(err, ShouldBeNil)

err = mr.RegisterTopic(context.TODO(), types.TopicID(1))
So(err, ShouldBeNil)

So(testutil.CompareWaitN(50, func() bool {
return len(clus.getSNIDs()) == nrSN
}), ShouldBeTrue)

Convey("After all ls belonging to the cluster are sealed, when a specific ls is unsealed", func(ctx C) {
// append to LS[0]
snIDs := clus.getSNIDs()
snCli := clus.reporterClientFac.(*DummyStorageNodeClientFactory).lookupClient(snIDs[0])
snCli.increaseUncommitted(0)

So(testutil.CompareWaitN(50, func() bool {
return snCli.numUncommitted(0) == 0
}), ShouldBeTrue)

// seal LS[0]
_, err = mr.Seal(context.TODO(), types.LogStreamID(snIDs[0]))
So(err, ShouldBeNil)

So(testutil.CompareWaitN(10, func() bool {
meta, err := mr.GetMetadata(context.TODO())
if err != nil {
return false
}

ls := meta.GetLogStream(types.LogStreamID(snIDs[0]))
return ls.Status == varlogpb.LogStreamStatusSealed
}), ShouldBeTrue)

// append to LS[1]
snCli = clus.reporterClientFac.(*DummyStorageNodeClientFactory).lookupClient(snIDs[1])
snCli.increaseUncommitted(0)

So(testutil.CompareWaitN(50, func() bool {
return snCli.numUncommitted(0) == 0
}), ShouldBeTrue)

// seal LS[1]
_, err = mr.Seal(context.TODO(), types.LogStreamID(snIDs[1]))
So(err, ShouldBeNil)

So(testutil.CompareWaitN(10, func() bool {
meta, err := mr.GetMetadata(context.TODO())
if err != nil {
return false
}

ls := meta.GetLogStream(types.LogStreamID(snIDs[1]))
return ls.Status == varlogpb.LogStreamStatusSealed
}), ShouldBeTrue)

// unseal LS[0]
err = mr.Unseal(context.TODO(), types.LogStreamID(snIDs[0]))
So(err, ShouldBeNil)

Convey("Then the ls should appenable", func(ctx C) {
// append to LS[0]
snCli = clus.reporterClientFac.(*DummyStorageNodeClientFactory).lookupClient(snIDs[0])
snCli.increaseUncommitted(0)

So(testutil.CompareWaitN(50, func() bool {
return snCli.numUncommitted(0) == 0
}), ShouldBeTrue)
})
})

})
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction(
Expand Down

0 comments on commit bb45410

Please sign in to comment.