Skip to content

Commit

Permalink
fix: client rpcserver subscriber hang (#3246)
Browse files Browse the repository at this point in the history
* fix: client rpcserver subscriber hang

Signed-off-by: Jim Ma <[email protected]>

* chore: optimize rpcserver subscriber logic

Signed-off-by: Jim Ma <[email protected]>

* chore: update comment

Signed-off-by: Jim Ma <[email protected]>

* chore: optimize isAllPiecesSent

Signed-off-by: Jim Ma <[email protected]>

---------

Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored May 7, 2024
1 parent 22f46b7 commit 1e6f09d
Showing 1 changed file with 42 additions and 20 deletions.
62 changes: 42 additions & 20 deletions client/daemon/rpcserver/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,13 @@ func searchNextPieceNum(sentMap map[int32]struct{}, cur uint32) (nextPieceNum ui
}

// sendExistPieces will send as much as possible pieces
func (s *subscriber) sendExistPieces(startNum uint32) (total int32, err error) {
func (s *subscriber) sendExistPieces(startNum uint32) error {
s.request.StartNum = startNum
return sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true)
total, err := sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true)
if total > -1 && s.isUnknownTotalPieces() {
s.totalPieces = total
}
return err
}

func (s *subscriber) receiveRemainingPieceTaskRequests() {
Expand Down Expand Up @@ -161,9 +165,23 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() {
}
}

// totalPieces is -1, 0, n
func (s *subscriber) isKnownTotalPieces() bool {
return s.totalPieces > -1
}

func (s *subscriber) isUnknownTotalPieces() bool {
return !s.isKnownTotalPieces()
}

func (s *subscriber) isAllPiecesSent(nextPieceNum uint32) bool {
return nextPieceNum == uint32(s.totalPieces)
}

func (s *subscriber) sendRemainingPieceTasks() error {
// nextPieceNum is the least piece num which did not send to remote peer
// may great then total piece count, check the total piece count when use it
// available values: [0, n], n is total piece count
// when nextPieceNum is n, indicate all pieces done
var nextPieceNum uint32
s.Lock()
for i := int32(s.skipPieceCount); ; i++ {
Expand All @@ -173,6 +191,7 @@ func (s *subscriber) sendRemainingPieceTasks() error {
}
}
s.Unlock()
s.Debugf("desired next piece num: %d", nextPieceNum)
loop:
for {
select {
Expand All @@ -182,51 +201,54 @@ loop:
case info := <-s.PieceInfoChannel:
s.Infof("receive piece info, num: %d, finished: %v", info.Num, info.Finished)
// not desired piece
if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum {
if uint32(info.Num) < nextPieceNum {
continue
}

s.Lock()
total, err := s.sendExistPieces(uint32(info.Num))
err := s.sendExistPieces(uint32(info.Num))
if err != nil {
err = s.saveError(err)
s.Unlock()
return err
}
if total > -1 && s.totalPieces == -1 {
s.totalPieces = total
}
if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) == int(s.totalPieces) {
s.Unlock()
break loop
}
if info.Finished {

nextPieceNum = s.searchNextPieceNum(nextPieceNum)
s.Debugf("update desired next piece num: %d", nextPieceNum)

if info.Finished && s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
break loop
}
nextPieceNum = s.searchNextPieceNum(nextPieceNum)
s.Unlock()
case <-s.Success:
s.Infof("peer task is success, send remaining pieces")
s.Lock()
// all pieces already sent
// empty piece task will reach sendExistPieces to sync content length and piece count
if s.totalPieces > 0 && nextPieceNum == uint32(s.totalPieces) {
if s.totalPieces > 0 && s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
break loop
}
total, err := s.sendExistPieces(nextPieceNum)

err := s.sendExistPieces(nextPieceNum)
if err != nil {
err = s.saveError(err)
s.Unlock()
return err
}
if total > -1 && s.totalPieces == -1 {
s.totalPieces = total

if s.isUnknownTotalPieces() {
s.Unlock()
msg := "task success, but total pieces is unknown"
s.Errorf(msg)
return dferrors.Newf(commonv1.Code_ClientError, msg)
}
if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) != int(s.totalPieces) {

nextPieceNum = s.searchNextPieceNum(nextPieceNum)
if !s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
msg := "peer task success, but can not send all pieces"
msg := "task success, but not all pieces are sent out"
s.Errorf(msg)
return dferrors.Newf(commonv1.Code_ClientError, msg)
}
Expand Down

0 comments on commit 1e6f09d

Please sign in to comment.