Skip to content

Commit

Permalink
fix: unregistering topic translates to multiple redundant Raft proposals
Browse files Browse the repository at this point in the history
During topic unregistration, an early return after each log stream
unregistration prevented cacheCompleteCB from being called. Without this
callback, proposeWithGuarantee interpreted the operation as incomplete
and retried the entire topic unregistration multiple times. This resulted
in linear scaling of unregistration time with the number of log streams
(approximately 100ms per log stream) and additional consensus overhead.

Remove the premature return to ensure cacheCompleteCB is called after all
log streams are processed, allowing the entire operation to be handled in
a single Raft proposal.
  • Loading branch information
kakao-mark-yun authored and ijsong committed Feb 3, 2025
1 parent cc75160 commit 16093f0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
7 changes: 2 additions & 5 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,16 +636,15 @@ func (mr *RaftMetadataRepository) applyUnregisterTopic(r *mrpb.UnregisterTopic,
return verrors.ErrNotExist
}

UnregisterLS:
for _, lsID := range topic.LogStreams {
ls := mr.storage.lookupLogStream(lsID)
if ls == nil {
continue UnregisterLS
continue

Check warning on line 642 in internal/metarepos/raft_metadata_repository.go

View check run for this annotation

Codecov / codecov/patch

internal/metarepos/raft_metadata_repository.go#L642

Added line #L642 was not covered by tests
}

err := mr.storage.unregisterLogStream(lsID)
if err != nil {
continue UnregisterLS
continue

Check warning on line 647 in internal/metarepos/raft_metadata_repository.go

View check run for this annotation

Codecov / codecov/patch

internal/metarepos/raft_metadata_repository.go#L647

Added line #L647 was not covered by tests
}

for _, replica := range ls.Replicas {
Expand All @@ -656,8 +655,6 @@ UnregisterLS:
mr.logger.Panic("could not unregister reporter", zap.String("err", err.Error()))
}
}

return nil
}

err := mr.storage.UnregisterTopic(r.TopicID, nodeIndex, requestIndex)
Expand Down
61 changes: 61 additions & 0 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2637,6 +2637,67 @@ func TestMRUnregisterTopic(t *testing.T) {
meta, _ = clus.nodes[0].GetMetadata(context.TODO())
So(len(meta.GetLogStreams()), ShouldEqual, 0)
})

Convey("Given 1 topic & 1000 log streams", t, func(ctx C) {
rep := 1
nrNodes := 1
nrLS := 1000
topicID := types.TopicID(1)

clus := newMetadataRepoCluster(nrNodes, rep, false)
Reset(func() {
clus.closeNoErrors(t)
})

mr := clus.nodes[0]

So(clus.Start(), ShouldBeNil)
So(testutil.CompareWaitN(10, func() bool {
return clus.healthCheckAll()
}), ShouldBeTrue)

snIDs := make([]types.StorageNodeID, rep)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snIDs[i],
},
}

err := mr.RegisterStorageNode(context.TODO(), sn)
So(err, ShouldBeNil)
}

err := mr.RegisterTopic(context.TODO(), topicID)
So(err, ShouldBeNil)

lsIDs := make([]types.LogStreamID, nrLS)
for i := range lsIDs {
lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i)
}

for _, lsID := range lsIDs {
ls := makeLogStream(types.TopicID(1), lsID, snIDs)
err := mr.RegisterLogStream(context.TODO(), ls)
So(err, ShouldBeNil)
}

meta, _ := mr.GetMetadata(context.TODO())
So(len(meta.GetLogStreams()), ShouldEqual, nrLS)

// unregistering topic requires one raft consensus and pure in-memory
// state machine operations, so it should be done in less than 500ms.
require.EventuallyWithT(t, func(collect *assert.CollectT) {
err = mr.UnregisterTopic(context.TODO(), topicID)
require.NoError(collect, err)
}, 500*time.Millisecond, 10*time.Millisecond)
So(err, ShouldBeNil)

meta, _ = mr.GetMetadata(context.TODO())
So(len(meta.GetLogStreams()), ShouldEqual, 0)
})
}

func TestMetadataRepository_MaxTopicsCount(t *testing.T) {
Expand Down

0 comments on commit 16093f0

Please sign in to comment.