Skip to content

Commit

Permalink
fix possible deadlock in node_processor #42
Browse files Browse the repository at this point in the history
  • Loading branch information
chengshiwen committed Aug 25, 2024
1 parent 049ab0f commit 66582d7
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions services/hh/node_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,37 @@ func (n *NodeProcessor) Open() error {
// Close closes the NodeProcessor, terminating all data tranmission to the node.
// When closed it will not accept hinted-handoff data.
func (n *NodeProcessor) Close() error {
n.mu.Lock()
defer n.mu.Unlock()
if wait := func() bool {
n.mu.Lock()
defer n.mu.Unlock()

if n.done == nil {
// Already closed.
if n.closed() {
return false // Already closed.
}
close(n.done)
return true
}(); !wait {
return nil
}

close(n.done)
n.wg.Wait()
n.done = nil

// Release all remaining resources.
n.mu.Lock()
defer n.mu.Unlock()
n.done = nil
return n.queue.Close()
}

func (n *NodeProcessor) closed() bool {
select {
case <-n.done:
// NodeProcessor is closing.
return true
default:
}
return n.done == nil
}

// Statistics returns statistics for periodic monitoring.
func (n *NodeProcessor) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Expand Down Expand Up @@ -148,7 +164,7 @@ func (n *NodeProcessor) Purge() error {
n.mu.Lock()
defer n.mu.Unlock()

if n.done != nil {
if !n.closed() {
return fmt.Errorf("node processor is open")
}

Expand All @@ -161,7 +177,7 @@ func (n *NodeProcessor) WriteShard(points []models.Point) error {
n.mu.RLock()
defer n.mu.RUnlock()

if n.done == nil {
if n.closed() {
return fmt.Errorf("node processor is closed")
}

Expand Down

0 comments on commit 66582d7

Please sign in to comment.