diff --git a/opcua_plugin/browse.go b/opcua_plugin/browse.go index 53e806e..28bb904 100644 --- a/opcua_plugin/browse.go +++ b/opcua_plugin/browse.go @@ -78,16 +78,19 @@ type NodeTask struct { func browse(ctx context.Context, startNode NodeBrowser, startPath string, logger Logger, parentNodeId string, nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef, visited *sync.Map) { + var taskWg TrackedWaitGroup + var workerWg TrackedWaitGroup const numWorkers = 10 // Adjust based on your needs taskChan := make(chan NodeTask, numWorkers*10) // Start worker pool for i := 0; i < numWorkers; i++ { - wg.Add(1) - go worker(ctx, i, taskChan, nodeChan, errChan, opcuaBrowserChan, visited, logger, wg) + workerWg.Add(1) + go worker(ctx, i, taskChan, nodeChan, errChan, opcuaBrowserChan, visited, logger, &taskWg, &workerWg) } // Send initial task + taskWg.Add(1) taskChan <- NodeTask{ node: startNode, path: startPath, @@ -97,8 +100,10 @@ func browse(ctx context.Context, startNode NodeBrowser, startPath string, logger // Close task channel when all tasks are processed go func() { - wg.Wait() + taskWg.Wait() close(taskChan) + workerWg.Wait() + wg.Done() }() } @@ -111,98 +116,108 @@ func worker( opcuaBrowserChan chan NodeDef, visited *sync.Map, logger Logger, - wg *TrackedWaitGroup, + taskWg *TrackedWaitGroup, + workerWg *TrackedWaitGroup, ) { - for task := range taskChan { - // Skip if already visited or too deep - if _, exists := visited.LoadOrStore(task.node.ID(), struct{}{}); exists { - logger.Debugf("Worker %d: node %s already visited", id, task.node.ID().String()) - wg.Done() // Call Done() before continuing - continue - } - if task.level > 25 { - wg.Done() // Call Done() before continuing - continue - } + defer workerWg.Done() + for { + select { + case task, ok := <-taskChan: + if !ok { + // Channel is closed + return + } - // Err will be nil if the context is not done - if ctx.Err() != nil { - logger.Warnf("Worker %d: received cancellation signal", id) - wg.Done() // Call Done() before returning - return - } + // Skip if already visited or too deep + if _, exists := visited.LoadOrStore(task.node.ID(), struct{}{}); exists { + logger.Debugf("Worker %d: node %s already visited", id, task.node.ID().String()) + taskWg.Done() + continue + } - // Get node attributes - attrs, err := task.node.Attributes(ctx, ua.AttributeIDNodeClass, ua.AttributeIDBrowseName, - ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType) - if err != nil { - sendError(ctx, err, errChan, logger) - wg.Done() // Call Done() before returning - continue - } + if task.level > 25 { + taskWg.Done() + continue + } - if len(attrs) != 5 { - sendError(ctx, errors.Errorf("only got %d attr, needed 5", len(attrs)), errChan, logger) - wg.Done() // Call Done() before returning - continue - } + // Get node attributes + attrs, err := task.node.Attributes(ctx, ua.AttributeIDNodeClass, ua.AttributeIDBrowseName, + ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType) + if err != nil { + sendError(ctx, err, errChan, logger) + taskWg.Done() + continue + } - browseName, err := task.node.BrowseName(ctx) - if err != nil { - sendError(ctx, err, errChan, logger) - wg.Done() // Call Done() before returning - continue - } + if len(attrs) != 5 { + sendError(ctx, errors.Errorf("only got %d attr, needed 5", len(attrs)), errChan, logger) + taskWg.Done() + continue + } - newPath := sanitize(browseName.Name) - if task.path != "" { - newPath = task.path + "." + newPath - } + browseName, err := task.node.BrowseName(ctx) + if err != nil { + sendError(ctx, err, errChan, logger) + taskWg.Done() + continue + } - def := NodeDef{ - NodeID: task.node.ID(), - Path: newPath, - ParentNodeID: task.parentNodeId, - } + newPath := sanitize(browseName.Name) + if task.path != "" { + newPath = task.path + "." + newPath + } - if err := processNodeAttributes(attrs, &def, newPath, logger); err != nil { - sendError(ctx, err, errChan, logger) - wg.Done() // Call Done() before returning - continue - } + def := NodeDef{ + NodeID: task.node.ID(), + Path: newPath, + ParentNodeID: task.parentNodeId, + } - logger.Debugf("Worker %d: level %d: def.Path:%s def.NodeClass:%s\n", - id, task.level, def.Path, def.NodeClass) + if err := processNodeAttributes(attrs, &def, newPath, logger); err != nil { + sendError(ctx, err, errChan, logger) + taskWg.Done() + continue + } - // Handle browser channel - browserDef := def - browserDef.Path = join(task.path, browserDef.BrowseName) - select { - case opcuaBrowserChan <- browserDef: - default: - logger.Debugf("Worker %d: opcuaBrowserChan blocked, skipping", id) - } + logger.Debugf("\nWorker %d: level %d: def.Path:%s def.NodeClass:%s\n", + id, task.level, def.Path, def.NodeClass) + logger.Debugf("TrackedWaitGroup count: %d\n", taskWg.Count()) + logger.Debugf("WorkerWg count: %d\n", workerWg.Count()) - // Process based on node class - switch def.NodeClass { - case ua.NodeClassVariable: + // Handle browser channel + browserDef := def + browserDef.Path = join(task.path, browserDef.BrowseName) select { - case nodeChan <- def: - case <-ctx.Done(): - logger.Warnf("Worker %d: Failed to send node due to cancellation", id) - wg.Done() // Call Done() before returning - return - } - if err := browseChildren(ctx, task, def, taskChan, wg); err != nil { - sendError(ctx, err, errChan, logger) + case opcuaBrowserChan <- browserDef: + default: + logger.Debugf("Worker %d: opcuaBrowserChan blocked, skipping", id) } - case ua.NodeClassObject: - if err := browseChildren(ctx, task, def, taskChan, wg); err != nil { - sendError(ctx, err, errChan, logger) + // Process based on node class + switch def.NodeClass { + case ua.NodeClassVariable: + select { + case nodeChan <- def: + case <-ctx.Done(): + logger.Warnf("Worker %d: Failed to send node due to cancellation", id) + taskWg.Done() + return + } + if err := browseChildren(ctx, task, def, taskChan, taskWg); err != nil { + sendError(ctx, err, errChan, logger) + } + + case ua.NodeClassObject: + if err := browseChildren(ctx, task, def, taskChan, taskWg); err != nil { + sendError(ctx, err, errChan, logger) + } } + taskWg.Done() + + case <-ctx.Done(): + logger.Warnf("Worker %d: received cancellation signal", id) + return } - wg.Done() // Call Done() at the end of each successful iteration } } @@ -228,6 +243,7 @@ func browseChildren(ctx context.Context, task NodeTask, def NodeDef, taskChan ch // Queue child tasks for _, child := range children { wg.Add(1) + fmt.Printf("Adding child task to channel: %s", child.ID().String()) taskChan <- NodeTask{ node: child, path: def.Path,