diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 03bae46ae..ae26e2cbf 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -636,16 +636,15 @@ func (mr *RaftMetadataRepository) applyUnregisterTopic(r *mrpb.UnregisterTopic, return verrors.ErrNotExist } -UnregisterLS: for _, lsID := range topic.LogStreams { ls := mr.storage.lookupLogStream(lsID) if ls == nil { - continue UnregisterLS + continue } err := mr.storage.unregisterLogStream(lsID) if err != nil { - continue UnregisterLS + continue } for _, replica := range ls.Replicas { @@ -656,8 +655,6 @@ UnregisterLS: mr.logger.Panic("could not unregister reporter", zap.String("err", err.Error())) } } - - return nil } err := mr.storage.UnregisterTopic(r.TopicID, nodeIndex, requestIndex) diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index b19eef0a6..c9988c664 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -2637,6 +2637,67 @@ func TestMRUnregisterTopic(t *testing.T) { meta, _ = clus.nodes[0].GetMetadata(context.TODO()) So(len(meta.GetLogStreams()), ShouldEqual, 0) }) + + Convey("Given 1 topic & 1000 log streams", t, func(ctx C) { + rep := 1 + nrNodes := 1 + nrLS := 1000 + topicID := types.TopicID(1) + + clus := newMetadataRepoCluster(nrNodes, rep, false) + Reset(func() { + clus.closeNoErrors(t) + }) + + mr := clus.nodes[0] + + So(clus.Start(), ShouldBeNil) + So(testutil.CompareWaitN(10, func() bool { + return clus.healthCheckAll() + }), ShouldBeTrue) + + snIDs := make([]types.StorageNodeID, rep) + for i := range snIDs { + snIDs[i] = types.StorageNodeID(i) + + sn := &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: snIDs[i], + }, + } + + err := mr.RegisterStorageNode(context.TODO(), sn) + So(err, ShouldBeNil) + } + + err := mr.RegisterTopic(context.TODO(), topicID) + So(err, ShouldBeNil) + + lsIDs := make([]types.LogStreamID, nrLS) + for i := range lsIDs { + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) + } + + for _, lsID := range lsIDs { + ls := makeLogStream(types.TopicID(1), lsID, snIDs) + err := mr.RegisterLogStream(context.TODO(), ls) + So(err, ShouldBeNil) + } + + meta, _ := mr.GetMetadata(context.TODO()) + So(len(meta.GetLogStreams()), ShouldEqual, nrLS) + + // unregistering topic requires one raft consensus and pure in-memory + // state machine operations, so it should be done in less than 500ms. + require.EventuallyWithT(t, func(collect *assert.CollectT) { + err = mr.UnregisterTopic(context.TODO(), topicID) + require.NoError(collect, err) + }, 500*time.Millisecond, 10*time.Millisecond) + So(err, ShouldBeNil) + + meta, _ = mr.GetMetadata(context.TODO()) + So(len(meta.GetLogStreams()), ShouldEqual, 0) + }) } func TestMetadataRepository_MaxTopicsCount(t *testing.T) {