From d29c29a044fb72ff7c1267620d0fa58bf9ab5741 Mon Sep 17 00:00:00 2001 From: "pharrell.jang" Date: Wed, 29 Nov 2023 15:27:29 +0900 Subject: [PATCH 1/2] feat(metarepos): unseal reportCollector with LastCommittedVer - 1 --- .../dummy_storagenode_client_factory_impl.go | 7 ++ .../metarepos/raft_metadata_repository.go | 7 +- .../raft_metadata_repository_test.go | 92 +++++++++++++++++++ 3 files changed, 105 insertions(+), 1 deletion(-) diff --git a/internal/metarepos/dummy_storagenode_client_factory_impl.go b/internal/metarepos/dummy_storagenode_client_factory_impl.go index e4be67502..5ae8262bc 100644 --- a/internal/metarepos/dummy_storagenode_client_factory_impl.go +++ b/internal/metarepos/dummy_storagenode_client_factory_impl.go @@ -100,6 +100,7 @@ type DummyStorageNodeClient struct { logStreamIDs []types.LogStreamID knownVersion []types.Version + knownHWM []types.GLSN uncommittedLLSNOffset []types.LLSN uncommittedLLSNLength []uint64 @@ -169,6 +170,7 @@ func (fac *DummyStorageNodeClientFactory) getStorageNodeClient(_ context.Context } knownVersion := make([]types.Version, fac.nrLogStreams) + knownHWM := make([]types.GLSN, fac.nrLogStreams) uncommittedLLSNOffset := make([]types.LLSN, fac.nrLogStreams) for i := 0; i < fac.nrLogStreams; i++ { @@ -182,6 +184,7 @@ func (fac *DummyStorageNodeClientFactory) getStorageNodeClient(_ context.Context storageNodeID: snID, logStreamIDs: lsIDs, knownVersion: knownVersion, + knownHWM: knownHWM, uncommittedLLSNOffset: uncommittedLLSNOffset, uncommittedLLSNLength: uncommittedLLSNLength, status: status, @@ -280,6 +283,7 @@ func (r *DummyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error) { u := snpb.LogStreamUncommitReport{ LogStreamID: lsID, Version: r.knownVersion[i], + HighWatermark: r.knownHWM[i], UncommittedLLSNOffset: r.uncommittedLLSNOffset[i], UncommittedLLSNLength: r.uncommittedLLSNLength[i], } @@ -321,6 +325,7 @@ func (r *DummyStorageNodeClient) Commit(cr snpb.CommitRequest) error { } r.knownVersion[idx] = cr.CommitResult.Version + r.knownHWM[idx] = cr.CommitResult.HighWatermark r.uncommittedLLSNOffset[idx] += types.LLSN(cr.CommitResult.CommittedGLSNLength) r.uncommittedLLSNLength[idx] -= cr.CommitResult.CommittedGLSNLength @@ -408,6 +413,7 @@ func (r *DummyStorageNodeClient) makeInvalid(idx int) { defer r.mu.Unlock() r.knownVersion[idx] = 0 + r.knownHWM[idx] = 0 r.uncommittedLLSNOffset[idx] = 0 } @@ -442,6 +448,7 @@ func (fac *DummyStorageNodeClientFactory) recoverRPC(snID types.StorageNodeID) { storageNodeID: old.storageNodeID, logStreamIDs: old.logStreamIDs, knownVersion: old.knownVersion, + knownHWM: old.knownHWM, uncommittedLLSNOffset: old.uncommittedLLSNOffset, uncommittedLLSNLength: old.uncommittedLLSNLength, status: DummyStorageNodeClientStatusRunning, diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 5a1c6c208..326700f8c 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -991,7 +991,12 @@ func (mr *RaftMetadataRepository) applyUnseal(r *mrpb.Unseal, nodeIndex, request return err } - mr.reportCollector.Unseal(r.LogStreamID, mr.GetLastCommitVersion()) + ver := mr.GetLastCommitVersion() + if ver > 0 { + // Let logStream know cur version. + ver = ver - 1 + } + mr.reportCollector.Unseal(r.LogStreamID, ver) return nil } diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index b291f13cf..5d6e36427 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -2973,6 +2973,98 @@ func TestMRTopicCatchup(t *testing.T) { }) } +func TestMRCatchupUnsealedLogstream(t *testing.T) { + Convey("Given MR cluster", t, func(ctx C) { + nrRep := 1 + nrTopic := 1 + nrSN := 2 + clus := newMetadataRepoCluster(1, nrRep, false) + Reset(func() { + clus.closeNoErrors(t) + }) + + So(clus.Start(), ShouldBeNil) + So(testutil.CompareWaitN(10, func() bool { + return clus.healthCheckAll() + }), ShouldBeTrue) + + mr := clus.nodes[0] + + // register SN & LS + err := clus.initDummyStorageNode(nrSN, nrTopic) + So(err, ShouldBeNil) + + err = mr.RegisterTopic(context.TODO(), types.TopicID(1)) + So(err, ShouldBeNil) + + So(testutil.CompareWaitN(50, func() bool { + return len(clus.getSNIDs()) == nrSN + }), ShouldBeTrue) + + Convey("After all ls belonging to the cluster are sealed, when a specific ls is unsealed", func(ctx C) { + // append to LS[0] + snIDs := clus.getSNIDs() + snCli := clus.reporterClientFac.(*DummyStorageNodeClientFactory).lookupClient(snIDs[0]) + snCli.increaseUncommitted(0) + + So(testutil.CompareWaitN(50, func() bool { + return snCli.numUncommitted(0) == 0 + }), ShouldBeTrue) + + // seal LS[0] + _, err = mr.Seal(context.TODO(), types.LogStreamID(snIDs[0])) + So(err, ShouldBeNil) + + So(testutil.CompareWaitN(10, func() bool { + meta, err := mr.GetMetadata(context.TODO()) + if err != nil { + return false + } + + ls := meta.GetLogStream(types.LogStreamID(snIDs[0])) + return ls.Status == varlogpb.LogStreamStatusSealed + }), ShouldBeTrue) + + // append to LS[1] + snCli = clus.reporterClientFac.(*DummyStorageNodeClientFactory).lookupClient(snIDs[1]) + snCli.increaseUncommitted(0) + + So(testutil.CompareWaitN(50, func() bool { + return snCli.numUncommitted(0) == 0 + }), ShouldBeTrue) + + // seal LS[1] + _, err = mr.Seal(context.TODO(), types.LogStreamID(snIDs[1])) + So(err, ShouldBeNil) + + So(testutil.CompareWaitN(10, func() bool { + meta, err := mr.GetMetadata(context.TODO()) + if err != nil { + return false + } + + ls := meta.GetLogStream(types.LogStreamID(snIDs[1])) + return ls.Status == varlogpb.LogStreamStatusSealed + }), ShouldBeTrue) + + // unseal LS[0] + err = mr.Unseal(context.TODO(), types.LogStreamID(snIDs[0])) + So(err, ShouldBeNil) + + Convey("Then the ls should appenable", func(ctx C) { + // append to LS[0] + snCli = clus.reporterClientFac.(*DummyStorageNodeClientFactory).lookupClient(snIDs[0]) + snCli.increaseUncommitted(0) + + So(testutil.CompareWaitN(50, func() bool { + return snCli.numUncommitted(0) == 0 + }), ShouldBeTrue) + }) + }) + + }) +} + func TestMain(m *testing.M) { goleak.VerifyTestMain(m, goleak.IgnoreTopFunction( From c072e4ddd8955e61720eb2057f00dc3b25d0071c Mon Sep 17 00:00:00 2001 From: "pharrell.jang" Date: Tue, 26 Dec 2023 20:28:47 +0900 Subject: [PATCH 2/2] chore: add comment --- internal/metarepos/raft_metadata_repository.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 326700f8c..e60ef876f 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -993,7 +993,10 @@ func (mr *RaftMetadataRepository) applyUnseal(r *mrpb.Unseal, nodeIndex, request ver := mr.GetLastCommitVersion() if ver > 0 { - // Let logStream know cur version. + // Let logstream know the last commit version. + // This tricks the committer into thinking that the last version + // that storagenode knows is last commit version - 1. + // So, committer delivers last commit result to storagenode. ver = ver - 1 } mr.reportCollector.Unseal(r.LogStreamID, ver)