From 8144e6cf2c1dfed2b3d04231f547103fca847daf Mon Sep 17 00:00:00 2001 From: Jeremy Theocharis Date: Wed, 18 Sep 2024 15:01:53 +0000 Subject: [PATCH] fix: added exception for prosys --- opcua_plugin/opcua.go | 183 ++++++++++++++++++++++----- opcua_plugin/opcua_simulator_test.go | 78 +++++++----- 2 files changed, 198 insertions(+), 63 deletions(-) diff --git a/opcua_plugin/opcua.go b/opcua_plugin/opcua.go index 31f75eb..4eb783f 100644 --- a/opcua_plugin/opcua.go +++ b/opcua_plugin/opcua.go @@ -435,6 +435,7 @@ type OPCUAInput struct { HeartbeatManualSubscribed bool HeartbeatNodeId *ua.NodeID Subscription *opcua.Subscription + ServerInfo ServerInfo } // UpdateNodePaths updates the node paths to use the nodeID instead of the browseName @@ -576,6 +577,44 @@ func (g *OPCUAInput) createMessageFromValue(dataValue *ua.DataValue, nodeDef Nod return message } +func (g *OPCUAInput) Read(ctx context.Context, req *ua.ReadRequest) (*ua.ReadResponse, error) { + resp, err := g.Client.Read(ctx, req) + if err != nil { + g.Log.Errorf("Read failed: %s", err) + // if the error is StatusBadSessionIDInvalid, the session has been closed, and we need to reconnect. + switch { + case errors.Is(err, ua.StatusBadSessionIDInvalid): + _ = g.Close(ctx) + return nil, service.ErrNotConnected + case errors.Is(err, ua.StatusBadCommunicationError): + _ = g.Close(ctx) + return nil, service.ErrNotConnected + case errors.Is(err, ua.StatusBadConnectionClosed): + _ = g.Close(ctx) + return nil, service.ErrNotConnected + case errors.Is(err, ua.StatusBadTimeout): + _ = g.Close(ctx) + return nil, service.ErrNotConnected + case errors.Is(err, ua.StatusBadConnectionRejected): + _ = g.Close(ctx) + return nil, service.ErrNotConnected + case errors.Is(err, ua.StatusBadServerNotConnected): + _ = g.Close(ctx) + return nil, service.ErrNotConnected + } + + // return error and stop executing this function. + return nil, err + } + + if !errors.Is(resp.Results[0].Status, ua.StatusOK) { + g.Log.Errorf("Status not OK: %v", resp.Results[0].Status) + return nil, fmt.Errorf("status not OK: %v", resp.Results[0].Status) + } + + return resp, nil +} + func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { if g.Client == nil { return nil, nil, errors.New("client is nil") @@ -601,37 +640,9 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s TimestampsToReturn: ua.TimestampsToReturnBoth, } - resp, err := g.Client.Read(ctx, req) + resp, err := g.Read(ctx, req) if err != nil { g.Log.Errorf("Read failed: %s", err) - // if the error is StatusBadSessionIDInvalid, the session has been closed, and we need to reconnect. - switch { - case errors.Is(err, ua.StatusBadSessionIDInvalid): - _ = g.Close(ctx) - return nil, nil, service.ErrNotConnected - case errors.Is(err, ua.StatusBadCommunicationError): - _ = g.Close(ctx) - return nil, nil, service.ErrNotConnected - case errors.Is(err, ua.StatusBadConnectionClosed): - _ = g.Close(ctx) - return nil, nil, service.ErrNotConnected - case errors.Is(err, ua.StatusBadTimeout): - _ = g.Close(ctx) - return nil, nil, service.ErrNotConnected - case errors.Is(err, ua.StatusBadConnectionRejected): - _ = g.Close(ctx) - return nil, nil, service.ErrNotConnected - case errors.Is(err, ua.StatusBadServerNotConnected): - _ = g.Close(ctx) - return nil, nil, service.ErrNotConnected - } - - // return error and stop executing this function. - return nil, nil, err - } - - if !errors.Is(resp.Results[0].Status, ua.StatusOK) { - g.Log.Errorf("Status not OK: %v", resp.Results[0].Status) } // Create a message with the node's path as the metadata @@ -745,7 +756,11 @@ func (g *OPCUAInput) ReadBatch(ctx context.Context) (msgs service.MessageBatch, _ = g.Close(ctx) return nil, nil, service.ErrNotConnected } else { - g.Log.Warn("No heartbeat message (ServerTime) received for over 10 seconds. This can be normal for certain OPC UA servers (e.g., Prosys OPC UA Simulation). Other messages are being received; continuing operations.") + if g.ServerInfo.ManufacturerName == "Prosys OPC Ltd." { + g.Log.Info("No heartbeat message (ServerTime) received for over 10 seconds. This is normal for your Prosys OPC UA server. Other messages are being received; continuing operations. ") + } else { + g.Log.Warn("No heartbeat message (ServerTime) received for over 10 seconds. Other messages are being received; continuing operations.") + } } } @@ -1026,7 +1041,17 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { } g.Log.Infof("Connected to %s", g.Endpoint) - g.Log.Infof("Please note that browsing large node trees can take a long time (around 5 nodes per second)") + + // Get OPC UA server information + serverInfo, err := g.GetOPCUAServerInformation(ctx) + if err != nil { + g.Log.Warnf("Failed to get OPC UA server information: %s", err) + } else { + g.Log.Infof("OPC UA Server Information: %v+", serverInfo) + g.ServerInfo = serverInfo + } + + g.Log.Infof("Please note that browsing large node trees can take some time") g.Client = c @@ -1040,6 +1065,102 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { return nil } +type ServerInfo struct { + ManufacturerName string + ProductName string + SoftwareVersion string +} + +// GetOPCUAServerInformation retrieves the server information from the OPC UA server +// It is available as i=2295 +func (g *OPCUAInput) GetOPCUAServerInformation(ctx context.Context) (ServerInfo, error) { + + if g.Client == nil { + return ServerInfo{}, errors.New("client is nil") + } + // Fetch ManufacturerName node from i=2263 + manufacturerNameNodeID := ua.NewNumericNodeID(0, 2263) + productNameNodeID := ua.NewNumericNodeID(0, 2261) + softwareVersionNodeID := ua.NewNumericNodeID(0, 2264) + + nodeChan := make(chan NodeDef, 3) + var wg sync.WaitGroup + errChan := make(chan error, 3) + + wg.Add(3) + go browse(ctx, g.Client.Node(manufacturerNameNodeID), "", 0, g.Log, manufacturerNameNodeID.String(), nodeChan, errChan, &wg) + go browse(ctx, g.Client.Node(productNameNodeID), "", 0, g.Log, productNameNodeID.String(), nodeChan, errChan, &wg) + go browse(ctx, g.Client.Node(softwareVersionNodeID), "", 0, g.Log, softwareVersionNodeID.String(), nodeChan, errChan, &wg) + wg.Wait() + + close(nodeChan) + close(errChan) + + if len(errChan) > 0 { + return ServerInfo{}, <-errChan + } + + var nodeList []NodeDef + for node := range nodeChan { + nodeList = append(nodeList, node) + } + + if len(nodeList) != 3 { + g.Log.Warn("Could not find OPC UA Server Information") + return ServerInfo{}, errors.New("could not find OPC UA Server Information") + } + + var nodesToRead []*ua.ReadValueID + for _, node := range nodeList { + nodesToRead = append(nodesToRead, &ua.ReadValueID{ + NodeID: node.NodeID, + }) + } + + req := &ua.ReadRequest{ + MaxAge: 2000, + NodesToRead: nodesToRead, + TimestampsToReturn: ua.TimestampsToReturnBoth, + } + + resp, err := g.Read(ctx, req) + if err != nil { + g.Log.Errorf("Read failed: %s", err) + return ServerInfo{}, err + } + + if len(resp.Results) != 3 { + g.Log.Errorf("Expected 3 results, got %d", len(resp.Results)) + return ServerInfo{}, errors.New("expected 3 results") + } + + serverInfo := ServerInfo{} + + for i, node := range nodeList { + value := resp.Results[i] + if value == nil || value.Value == nil { + g.Log.Warnf("Received nil in item structure for OPC UA Server Information") + } + + message := g.createMessageFromValue(value, node) + if message != nil { + messageBytes, err := message.AsBytes() + if err != nil { + return ServerInfo{}, err + } + + if node.NodeID.IntID() == 2263 { + serverInfo.ManufacturerName = string(messageBytes) + } else if node.NodeID.IntID() == 2261 { + serverInfo.ProductName = string(messageBytes) + } else if node.NodeID.IntID() == 2264 { + serverInfo.SoftwareVersion = string(messageBytes) + } + } + } + return serverInfo, nil +} + func (g *OPCUAInput) BrowseAndSubscribeIfNeeded(ctx context.Context) error { // Create a slice to store the detected nodes diff --git a/opcua_plugin/opcua_simulator_test.go b/opcua_plugin/opcua_simulator_test.go index b39bea0..c8c13c1 100644 --- a/opcua_plugin/opcua_simulator_test.go +++ b/opcua_plugin/opcua_simulator_test.go @@ -24,17 +24,56 @@ var _ = Describe("Test Against Prosys Simulator", func() { var endpoint string - Describe("YAML Configuration", func() { - BeforeEach(func() { - endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI") + BeforeEach(func() { + endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI") + + // Check if environment variables are set + if endpoint == "" { + Skip("Skipping test: environment variables not set") + return + } + + }) + + Describe("OPC UA Server Information", func() { + + It("should connect to the server and retrieve server information", func() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + var nodeIDStrings = []string{"ns=3;i=1003"} + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) - // Check if environment variables are set - if endpoint == "" { - Skip("Skipping test: environment variables not set") - return + input := &OPCUAInput{ + Endpoint: endpoint, + Username: "", + Password: "", + NodeIDs: parsedNodeIDs, + SecurityMode: "None", + SecurityPolicy: "None", } + // Attempt to connect + err := input.Connect(ctx) + Expect(err).NotTo(HaveOccurred()) + + serverInformation, err := input.GetOPCUAServerInformation(ctx) + Expect(err).NotTo(HaveOccurred()) + + GinkgoWriter.Printf("Server Information: \n") + GinkgoWriter.Printf("ManufacturerName: %s\n", serverInformation.ManufacturerName) + GinkgoWriter.Printf("ProductName: %s\n", serverInformation.ProductName) + GinkgoWriter.Printf("SoftwareVersion: %s\n", serverInformation.SoftwareVersion) + + // Close connection + if input.Client != nil { + err = input.Close(ctx) + Expect(err).NotTo(HaveOccurred()) + } }) + }) + + Describe("YAML Configuration", func() { When("using a yaml and stream builder", func() { @@ -67,7 +106,6 @@ opcua: var count int64 err = builder.AddConsumerFunc(func(c context.Context, m *service.Message) error { atomic.AddInt64(&count, 1) - GinkgoWriter.Printf("Received message: %+v\n", m) return err }) @@ -99,16 +137,6 @@ opcua: var endpoint string - BeforeEach(func() { - endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI") - - // Check if environment variables are set - if endpoint == "" { - Skip("Skipping test: environment variables not set") - return - } - - }) It("should read data correctly", func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -153,20 +181,6 @@ opcua: Describe("Secure (SignAndEncrypt/Basic256Sha256) Connect", func() { - var endpoint string - - BeforeEach(func() { - Skip("Skipping test: prosys will reject all unknown certificates") - - endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI") - - // Check if environment variables are set - if endpoint == "" { - Skip("Skipping test: environment variables not set") - return - } - - }) It("should read data correctly", func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel()