Skip to content

Commit

Permalink
Merge pull request #117 from united-manufacturing-hub/eng-2364-fix-loops
Browse files Browse the repository at this point in the history


[ENG-2364] Avoid Looping of nodes that are visited already while browsing OPCUA server
  • Loading branch information
kanapuli authored Jan 29, 2025
2 parents 86150a2 + 845c9fb commit 4e410e6
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 25 deletions.
18 changes: 12 additions & 6 deletions opcua_plugin/browse.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type Logger interface {

// Browse is a public wrapper function for the browse function
// Avoid using this function directly, use it only for testing
func Browse(ctx context.Context, n NodeBrowser, path string, level int, logger Logger, parentNodeId string, nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef) {
browse(ctx, n, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
func Browse(ctx context.Context, n NodeBrowser, path string, level int, logger Logger, parentNodeId string, nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef, visited *sync.Map) {
browse(ctx, n, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, visited)
}

// browse recursively explores OPC UA nodes to build a comprehensive list of NodeDefs.
Expand Down Expand Up @@ -92,9 +92,15 @@ func Browse(ctx context.Context, n NodeBrowser, path string, level int, logger L
// - `wg` (`*sync.WaitGroup`): WaitGroup to synchronize the completion of goroutines.
// **Returns:**
// - `void`: Errors are sent through `errChan`, and discovered nodes are sent through `nodeChan`.
func browse(ctx context.Context, n NodeBrowser, path string, level int, logger Logger, parentNodeId string, nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef) {
func browse(ctx context.Context, n NodeBrowser, path string, level int, logger Logger, parentNodeId string, nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef, visited *sync.Map) {
defer wg.Done()

// Check if the current node is already visited else proceed
if _, exists := visited.LoadOrStore(n.ID(), struct{}{}); exists {
logger.Debugf("node %s is visited already, hence doing an early exit from the browse routine", n.ID().String())
return
}

// Limits browsing depth to a maximum of 25 levels in the node hierarchy.
// Performance impact is minimized since most browse operations terminate earlier
// due to other exit conditions before reaching this maximum depth.
Expand Down Expand Up @@ -287,7 +293,7 @@ func browse(ctx context.Context, n NodeBrowser, path string, level int, logger L
}
for _, child := range children {
wg.Add(1)
go browse(ctx, child, def.Path, level+1, logger, def.NodeID.String(), nodeChan, errChan, wg, opcuaBrowserChan)
go browse(ctx, child, def.Path, level+1, logger, def.NodeID.String(), nodeChan, errChan, wg, opcuaBrowserChan, visited)
}
return nil
}
Expand Down Expand Up @@ -403,7 +409,7 @@ func (g *OPCUAInput) discoverNodes(ctx context.Context) ([]NodeDef, map[string]s
g.Log.Debugf("Browsing nodeID: %s", nodeID.String())
wg.Add(1)
wrapperNodeID := NewOpcuaNodeWrapper(g.Client.Node(nodeID))
go browse(timeoutCtx, wrapperNodeID, "", 0, g.Log, nodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan)
go browse(timeoutCtx, wrapperNodeID, "", 0, g.Log, nodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan, &g.visited)
}

go func() {
Expand Down Expand Up @@ -479,7 +485,7 @@ func (g *OPCUAInput) BrowseAndSubscribeIfNeeded(ctx context.Context) error {

wgHeartbeat.Add(1)
wrapperNodeID := NewOpcuaNodeWrapper(g.Client.Node(heartbeatNodeID))
go browse(ctx, wrapperNodeID, "", 1, g.Log, heartbeatNodeID.String(), nodeHeartbeatChan, errChanHeartbeat, &wgHeartbeat, opcuaBrowserChanHeartbeat)
go browse(ctx, wrapperNodeID, "", 1, g.Log, heartbeatNodeID.String(), nodeHeartbeatChan, errChanHeartbeat, &wgHeartbeat, opcuaBrowserChanHeartbeat, &g.visited)

wgHeartbeat.Wait()
close(nodeHeartbeatChan)
Expand Down
2 changes: 1 addition & 1 deletion opcua_plugin/browse_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (g *OPCUAInput) GetNodeTree(ctx context.Context, msgChan chan<- string, roo

var wg TrackedWaitGroup
wg.Add(1)
browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(rootNode.NodeId)), "", 0, g.Log, rootNode.NodeId.String(), nodeChan, errChan, &wg, opcuaBrowserChan)
browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(rootNode.NodeId)), "", 0, g.Log, rootNode.NodeId.String(), nodeChan, errChan, &wg, opcuaBrowserChan, &g.visited)
go logBrowseStatus(ctx, nodeChan, msgChan, &wg)
go logErrors(ctx, errChan, g.Log)
go collectNodes(ctx, opcuaBrowserChan, nodeIDMap, &nodes)
Expand Down
1 change: 1 addition & 0 deletions opcua_plugin/opcua.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type OPCUAInput struct {
browseErrorChan chan error
AutoReconnect bool
ReconnectIntervalInSeconds int
visited sync.Map
}

// cleanupBrowsing ensures the browsing goroutine is properly stopped and cleaned up
Expand Down
3 changes: 2 additions & 1 deletion opcua_plugin/opcua_opc-plc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,11 +1425,12 @@ opcua:
errChan := make(chan error, 100)
opcuaBrowserChan := make(chan NodeDef, 100)
var wg TrackedWaitGroup
var visited sync.Map

// Browse the node
wg.Add(1)
wrapperNodeID := NewOpcuaNodeWrapper(input.Client.Node(parsedNodeIDs[0]))
go Browse(ctx, wrapperNodeID, "", 0, input.Log, parsedNodeIDs[0].String(), nodeChan, errChan, &wg, opcuaBrowserChan)
go Browse(ctx, wrapperNodeID, "", 0, input.Log, parsedNodeIDs[0].String(), nodeChan, errChan, &wg, opcuaBrowserChan, &visited)

wg.Wait()
close(nodeChan)
Expand Down
31 changes: 17 additions & 14 deletions opcua_plugin/opcua_unittest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/gopcua/opcua/id"
Expand Down Expand Up @@ -66,6 +67,7 @@ var _ = Describe("Unit Tests", func() {
errChan chan error
wg *TrackedWaitGroup
opcuaBrowserChan chan NodeDef
visited sync.Map
)
BeforeEach(func() {
ctx, cncl = context.WithTimeout(context.Background(), 180*time.Second)
Expand All @@ -80,6 +82,7 @@ var _ = Describe("Unit Tests", func() {
})
AfterEach(func() {
cncl()
visited.Clear()
})

Context("When browsing nodes with a node class value nil", func() {
Expand All @@ -100,7 +103,7 @@ var _ = Describe("Unit Tests", func() {
nodeBrowser = rootNodeWithNilNodeClass
wg.Add(1)
go func() {
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
}()
wg.Wait()
close(nodeChan)
Expand All @@ -127,7 +130,7 @@ var _ = Describe("Unit Tests", func() {
nodeBrowser = rootNode
wg.Add(1)
go func() {
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
}()
wg.Wait()
close(nodeChan)
Expand Down Expand Up @@ -165,7 +168,7 @@ var _ = Describe("Unit Tests", func() {
nodeBrowser = rootNode
wg.Add(1)
go func() {
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
}()
wg.Wait()
close(nodeChan)
Expand Down Expand Up @@ -205,7 +208,7 @@ var _ = Describe("Unit Tests", func() {
nodeBrowser = rootNode
wg.Add(1)
go func() {
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
}()
wg.Wait()
close(nodeChan)
Expand Down Expand Up @@ -245,7 +248,7 @@ var _ = Describe("Unit Tests", func() {
nodeBrowser = rootNode
wg.Add(1)
go func() {
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
}()
wg.Wait()
close(nodeChan)
Expand Down Expand Up @@ -288,7 +291,7 @@ var _ = Describe("Unit Tests", func() {
nodeBrowser = rootNode
wg.Add(1)
go func() {
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
}()
wg.Wait()
close(nodeChan)
Expand Down Expand Up @@ -364,7 +367,7 @@ var _ = Describe("Unit Tests", func() {
nodeBrowser = abcFolder
wg.Add(1)
go func() {
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, nodeBrowser, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
}()
wg.Wait()
close(nodeChan)
Expand Down Expand Up @@ -393,7 +396,7 @@ var _ = Describe("Unit Tests", func() {
childNode := createMockNode(85, "child", ua.NodeClassVariable)
rootNode.AddReferenceNode(id.HasChild, childNode)

nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)

Expect(errs).Should(BeEmpty())
Expect(nodes).Should(HaveLen(1))
Expand All @@ -406,7 +409,7 @@ var _ = Describe("Unit Tests", func() {
childNode := createMockNode(85, "child", ua.NodeClassVariable)
rootNode.AddReferenceNode(id.HasComponent, childNode)

nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
Expect(errs).Should(BeEmpty())
Expect(nodes).Should(HaveLen(1))
Expect(nodes[0].NodeID.String()).To(Equal("i=85"))
Expand All @@ -418,7 +421,7 @@ var _ = Describe("Unit Tests", func() {
childNode := createMockNode(85, "child", ua.NodeClassVariable)
rootNode.AddReferenceNode(id.Organizes, childNode)

nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
Expect(errs).Should(BeEmpty())
Expect(nodes).Should(HaveLen(1))
Expect(nodes[0].NodeID.String()).To(Equal("i=85"))
Expand All @@ -430,7 +433,7 @@ var _ = Describe("Unit Tests", func() {
childNode := createMockNode(85, "child", ua.NodeClassVariable)
rootNode.AddReferenceNode(id.FolderType, childNode)

nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
Expect(errs).Should(BeEmpty())
Expect(nodes).Should(HaveLen(1))
Expect(nodes[0].NodeID.String()).To(Equal("i=85"))
Expand All @@ -442,7 +445,7 @@ var _ = Describe("Unit Tests", func() {
childNode := createMockNode(85, "child", ua.NodeClassVariable)
rootNode.AddReferenceNode(id.HasNotifier, childNode)

nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
nodes, errs := startBrowsing(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, &visited)
Expect(errs).Should(BeEmpty())
Expect(nodes).Should(HaveLen(1))
Expect(nodes[0].NodeID.String()).To(Equal("i=85"))
Expand Down Expand Up @@ -686,10 +689,10 @@ func createMockNode(id uint32, name string, nodeClass ua.NodeClass) *MockOpcuaNo
// Ensure that the MockOpcuaNodeWraper implements the NodeBrowser interface
var _ NodeBrowser = &MockOpcuaNodeWraper{}

func startBrowsing(ctx context.Context, rootNode NodeBrowser, path string, level int, logger Logger, parentNodeId string, nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef) ([]NodeDef, []error) {
func startBrowsing(ctx context.Context, rootNode NodeBrowser, path string, level int, logger Logger, parentNodeId string, nodeChan chan NodeDef, errChan chan error, wg *TrackedWaitGroup, opcuaBrowserChan chan NodeDef, visited *sync.Map) ([]NodeDef, []error) {
wg.Add(1)
go func() {
Browse(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan)
Browse(ctx, rootNode, path, level, logger, parentNodeId, nodeChan, errChan, wg, opcuaBrowserChan, visited)
}()
wg.Wait()
close(nodeChan)
Expand Down
6 changes: 3 additions & 3 deletions opcua_plugin/server_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (g *OPCUAInput) GetOPCUAServerInformation(ctx context.Context) (ServerInfo,
var wg TrackedWaitGroup

wg.Add(3)
go browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(manufacturerNameNodeID)), "", 0, g.Log, manufacturerNameNodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan)
go browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(productNameNodeID)), "", 0, g.Log, productNameNodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan)
go browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(softwareVersionNodeID)), "", 0, g.Log, softwareVersionNodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan)
go browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(manufacturerNameNodeID)), "", 0, g.Log, manufacturerNameNodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan, &g.visited)
go browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(productNameNodeID)), "", 0, g.Log, productNameNodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan, &g.visited)
go browse(ctx, NewOpcuaNodeWrapper(g.Client.Node(softwareVersionNodeID)), "", 0, g.Log, softwareVersionNodeID.String(), nodeChan, errChan, &wg, opcuaBrowserChan, &g.visited)
wg.Wait()

close(nodeChan)
Expand Down

0 comments on commit 4e410e6

Please sign in to comment.