Skip to content

Commit

Permalink
Fix sync.WaitGroup mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
kanapuli committed Jan 31, 2025
1 parent 875f6c1 commit 9a25eef
Showing 1 changed file with 95 additions and 79 deletions.
174 changes: 95 additions & 79 deletions opcua_plugin/browse.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,19 @@ type NodeTask struct {
func browse(ctx context.Context, startNode NodeBrowser, startPath string, logger Logger, parentNodeId string,
nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef, visited *sync.Map) {

var taskWg TrackedWaitGroup
var workerWg TrackedWaitGroup
const numWorkers = 10 // Adjust based on your needs
taskChan := make(chan NodeTask, numWorkers*10)

// Start worker pool
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(ctx, i, taskChan, nodeChan, errChan, opcuaBrowserChan, visited, logger, wg)
workerWg.Add(1)
go worker(ctx, i, taskChan, nodeChan, errChan, opcuaBrowserChan, visited, logger, &taskWg, &workerWg)
}

// Send initial task
taskWg.Add(1)
taskChan <- NodeTask{
node: startNode,
path: startPath,
Expand All @@ -97,8 +100,10 @@ func browse(ctx context.Context, startNode NodeBrowser, startPath string, logger

// Close task channel when all tasks are processed
go func() {
wg.Wait()
taskWg.Wait()
close(taskChan)
workerWg.Wait()
wg.Done()
}()
}

Expand All @@ -111,98 +116,108 @@ func worker(
opcuaBrowserChan chan NodeDef,
visited *sync.Map,
logger Logger,
wg *TrackedWaitGroup,
taskWg *TrackedWaitGroup,
workerWg *TrackedWaitGroup,
) {
for task := range taskChan {
// Skip if already visited or too deep
if _, exists := visited.LoadOrStore(task.node.ID(), struct{}{}); exists {
logger.Debugf("Worker %d: node %s already visited", id, task.node.ID().String())
wg.Done() // Call Done() before continuing
continue
}
if task.level > 25 {
wg.Done() // Call Done() before continuing
continue
}
defer workerWg.Done()
for {
select {
case task, ok := <-taskChan:
if !ok {
// Channel is closed
return
}

// Err will be nil if the context is not done
if ctx.Err() != nil {
logger.Warnf("Worker %d: received cancellation signal", id)
wg.Done() // Call Done() before returning
return
}
// Skip if already visited or too deep
if _, exists := visited.LoadOrStore(task.node.ID(), struct{}{}); exists {
logger.Debugf("Worker %d: node %s already visited", id, task.node.ID().String())
taskWg.Done()
continue
}

// Get node attributes
attrs, err := task.node.Attributes(ctx, ua.AttributeIDNodeClass, ua.AttributeIDBrowseName,
ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType)
if err != nil {
sendError(ctx, err, errChan, logger)
wg.Done() // Call Done() before returning
continue
}
if task.level > 25 {
taskWg.Done()
continue
}

if len(attrs) != 5 {
sendError(ctx, errors.Errorf("only got %d attr, needed 5", len(attrs)), errChan, logger)
wg.Done() // Call Done() before returning
continue
}
// Get node attributes
attrs, err := task.node.Attributes(ctx, ua.AttributeIDNodeClass, ua.AttributeIDBrowseName,
ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType)
if err != nil {
sendError(ctx, err, errChan, logger)
taskWg.Done()
continue
}

browseName, err := task.node.BrowseName(ctx)
if err != nil {
sendError(ctx, err, errChan, logger)
wg.Done() // Call Done() before returning
continue
}
if len(attrs) != 5 {
sendError(ctx, errors.Errorf("only got %d attr, needed 5", len(attrs)), errChan, logger)
taskWg.Done()
continue
}

newPath := sanitize(browseName.Name)
if task.path != "" {
newPath = task.path + "." + newPath
}
browseName, err := task.node.BrowseName(ctx)
if err != nil {
sendError(ctx, err, errChan, logger)
taskWg.Done()
continue
}

def := NodeDef{
NodeID: task.node.ID(),
Path: newPath,
ParentNodeID: task.parentNodeId,
}
newPath := sanitize(browseName.Name)
if task.path != "" {
newPath = task.path + "." + newPath
}

if err := processNodeAttributes(attrs, &def, newPath, logger); err != nil {
sendError(ctx, err, errChan, logger)
wg.Done() // Call Done() before returning
continue
}
def := NodeDef{
NodeID: task.node.ID(),
Path: newPath,
ParentNodeID: task.parentNodeId,
}

logger.Debugf("Worker %d: level %d: def.Path:%s def.NodeClass:%s\n",
id, task.level, def.Path, def.NodeClass)
if err := processNodeAttributes(attrs, &def, newPath, logger); err != nil {
sendError(ctx, err, errChan, logger)
taskWg.Done()
continue
}

// Handle browser channel
browserDef := def
browserDef.Path = join(task.path, browserDef.BrowseName)
select {
case opcuaBrowserChan <- browserDef:
default:
logger.Debugf("Worker %d: opcuaBrowserChan blocked, skipping", id)
}
logger.Debugf("\nWorker %d: level %d: def.Path:%s def.NodeClass:%s\n",
id, task.level, def.Path, def.NodeClass)
logger.Debugf("TrackedWaitGroup count: %d\n", taskWg.Count())
logger.Debugf("WorkerWg count: %d\n", workerWg.Count())

// Process based on node class
switch def.NodeClass {
case ua.NodeClassVariable:
// Handle browser channel
browserDef := def
browserDef.Path = join(task.path, browserDef.BrowseName)
select {
case nodeChan <- def:
case <-ctx.Done():
logger.Warnf("Worker %d: Failed to send node due to cancellation", id)
wg.Done() // Call Done() before returning
return
}
if err := browseChildren(ctx, task, def, taskChan, wg); err != nil {
sendError(ctx, err, errChan, logger)
case opcuaBrowserChan <- browserDef:
default:
logger.Debugf("Worker %d: opcuaBrowserChan blocked, skipping", id)
}

case ua.NodeClassObject:
if err := browseChildren(ctx, task, def, taskChan, wg); err != nil {
sendError(ctx, err, errChan, logger)
// Process based on node class
switch def.NodeClass {
case ua.NodeClassVariable:
select {
case nodeChan <- def:
case <-ctx.Done():
logger.Warnf("Worker %d: Failed to send node due to cancellation", id)
taskWg.Done()
return
}
if err := browseChildren(ctx, task, def, taskChan, taskWg); err != nil {
sendError(ctx, err, errChan, logger)
}

case ua.NodeClassObject:
if err := browseChildren(ctx, task, def, taskChan, taskWg); err != nil {
sendError(ctx, err, errChan, logger)
}
}
taskWg.Done()

case <-ctx.Done():
logger.Warnf("Worker %d: received cancellation signal", id)
return
}
wg.Done() // Call Done() at the end of each successful iteration
}
}

Expand All @@ -228,6 +243,7 @@ func browseChildren(ctx context.Context, task NodeTask, def NodeDef, taskChan ch
// Queue child tasks
for _, child := range children {
wg.Add(1)
fmt.Printf("Adding child task to channel: %s", child.ID().String())
taskChan <- NodeTask{
node: child,
path: def.Path,
Expand Down

0 comments on commit 9a25eef

Please sign in to comment.