diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e0443567..0a681b29 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -38,28 +38,16 @@ jobs: go_version: ${{ env.GO_VERSION }} - name: Startup simulator in background run: | - docker-compose up & + docker-compose up -d working-directory: tests - - name: Wait for port 46010 to be ready + - name: Wait for port 50000 to be ready run: | - while ! nc -z localhost 46010; do - echo "Waiting for port 46010 to be ready..." + while ! nc -z localhost 50000; do + echo "Waiting for port 50000 to be ready..." sleep 1 done - name: Test run: make test - go-test-wago: - runs-on: hercules - timeout-minutes: 30 - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Setup Go - uses: ./.github/actions/setup-go - with: - go_version: ${{ env.GO_VERSION }} - - name: Test - run: TEST_WAGO_ENDPOINT_URI=${{ secrets.TEST_WAGO_ENDPOINT_URI }} TEST_WAGO_USERNAME=${{ secrets.TEST_WAGO_USERNAME }} TEST_WAGO_PASSWORD=${{ secrets.TEST_WAGO_PASSWORD }} make test go-build: runs-on: group: large-runners diff --git a/.github/workflows/wago.yml b/.github/workflows/wago.yml new file mode 100644 index 00000000..2b850184 --- /dev/null +++ b/.github/workflows/wago.yml @@ -0,0 +1,43 @@ +# Copyright 2023 UMH Systems GmbH +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +name: main + +on: + push: + branches: + - '**' +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + GO_VERSION: '1.21.*' + +concurrency: + group: wago-test + cancel-in-progress: true + +jobs: + go-test-wago: + runs-on: hercules + timeout-minutes: 30 + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Setup Go + uses: ./.github/actions/setup-go + with: + go_version: ${{ env.GO_VERSION }} + - name: Test + run: TEST_WAGO_ENDPOINT_URI=${{ secrets.TEST_WAGO_ENDPOINT_URI }} TEST_WAGO_USERNAME=${{ secrets.TEST_WAGO_USERNAME }} TEST_WAGO_PASSWORD=${{ secrets.TEST_WAGO_PASSWORD }} make test diff --git a/Makefile b/Makefile index 5848b074..a2ebf242 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ target: @goreleaser build --single-target --snapshot --id benthos \ --output ./tmp/bin/benthos test: - @go test ./... + @go test -v ./... lint: @golangci-lint run diff --git a/README.md b/README.md index 0832e480..a0ba6cc3 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,52 @@ spec: name: benthos-1-config ``` +### Capabilities +The plugin is designed to browse and subscribe to all child nodes within a folder for each configured NodeID, provided that the NodeID represents a folder. It features a recursion depth of up to 10 levels, enabling thorough exploration of nested folder structures. The browsing specifically targets nodes organized under the OPC UA 'Organizes' relationship type, intentionally excluding nodes under 'HasProperty' and 'HasComponent' relationships. Additionally, the plugin does not browse Objects represented by red, blue, or green cube icons in UAExpert. + +Subscriptions are selectively managed, with tags having a DataType of null being excluded from subscription. Also, by default, the plugin does not subscribe to the properties of a tag, such as minimum and maximum values. + +#### Datatypes +The plugin has been rigorously tested with an array of datatypes, both as single values and as arrays. The following datatypes have been verified for compatibility: + +- `Boolean` +- `Byte` +- `DateTime` +- `Double` +- `Enumeration` +- `ExpandedNodeId` +- `Float` +- `Guid` +- `Int16` +- `Int32` +- `Int64` +- `Integer` +- `LocalizedText` +- `NodeId` +- `Number` +- `QualifiedName` +- `SByte` +- `StatusCode` +- `String` +- `UInt16` +- `UInt32` +- `UInt64` +- `UInteger` +- `ByteArray` +- `ByteString` +- `Duration` +- `LocaleId` +- `UtcTime` +- `Variant` +- `XmlElement` + +There are specific datatypes which are currently not supported by the plugin and attempting to use them will result in errors. These include: + +- Two-dimensional arrays +- UA Extension Objects +- Variant arrays (Arrays with multiple different datatypes) + + ### Authentication and Security In benthos-umh, security and authentication are designed to be as robust as possible while maintaining flexibility. The software automates the process of selecting the highest level of security offered by an OPC-UA server for the selected Authentication Method, but the user can specify their own Security Policy / Security Mode if they want (see further below at Configuration options) diff --git a/plugin/generate_cert.go b/plugin/generate_cert.go new file mode 100644 index 00000000..1f7b1013 --- /dev/null +++ b/plugin/generate_cert.go @@ -0,0 +1,109 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Generate a self-signed X.509 certificate for a TLS server. Outputs to +// 'cert.pem' and 'key.pem' and will overwrite existing files. + +// Based on src/crypto/tls/generate_cert.go from the Go SDK +// Modified by the Gopcua Authors for use in creating an OPC-UA compliant client certificate + +package plugin + +import ( + "crypto/ecdsa" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "math/big" + "net" + "net/url" + "os" + "strings" + "time" +) + +func GenerateCert(host string, rsaBits int, validFor time.Duration) (certPEM, keyPEM []byte, err error) { + if len(host) == 0 { + return nil, nil, fmt.Errorf("missing required host parameter") + } + if rsaBits == 0 { + rsaBits = 2048 + } + + priv, err := rsa.GenerateKey(rand.Reader, rsaBits) + if err != nil { + return nil, nil, fmt.Errorf("failed to generate private key: %s", err) + } + + notBefore := time.Now() + notAfter := notBefore.Add(validFor) + + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + return nil, nil, fmt.Errorf("failed to generate serial number: %s", err) + } + + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"Gopcua Test Client"}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + + KeyUsage: x509.KeyUsageContentCommitment | x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageDataEncipherment | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, + } + + hosts := strings.Split(host, ",") + for _, h := range hosts { + if ip := net.ParseIP(h); ip != nil { + template.IPAddresses = append(template.IPAddresses, ip) + } else { + template.DNSNames = append(template.DNSNames, h) + } + if uri, err := url.Parse(h); err == nil { + template.URIs = append(template.URIs, uri) + } + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, publicKey(priv), priv) + if err != nil { + return nil, nil, fmt.Errorf("failed to create certificate: %s", err) + } + + return pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}), pem.EncodeToMemory(pemBlockForKey(priv)), nil +} + +func publicKey(priv interface{}) interface{} { + switch k := priv.(type) { + case *rsa.PrivateKey: + return &k.PublicKey + case *ecdsa.PrivateKey: + return &k.PublicKey + default: + return nil + } +} + +func pemBlockForKey(priv interface{}) *pem.Block { + switch k := priv.(type) { + case *rsa.PrivateKey: + return &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(k)} + case *ecdsa.PrivateKey: + b, err := x509.MarshalECPrivateKey(k) + if err != nil { + fmt.Fprintf(os.Stderr, "Unable to marshal ECDSA private key: %v", err) + os.Exit(2) + } + return &pem.Block{Type: "EC PRIVATE KEY", Bytes: b} + default: + return nil + } +} diff --git a/plugin/opcua.go b/plugin/opcua.go index 3e02ca24..d72ac48d 100644 --- a/plugin/opcua.go +++ b/plugin/opcua.go @@ -80,6 +80,8 @@ func browse(ctx context.Context, n *opcua.Node, path string, level int, logger * switch err := attrs[0].Status; err { case ua.StatusOK: def.NodeClass = ua.NodeClass(attrs[0].Value.Int()) + case ua.StatusBadSecurityModeInsufficient: + return nil, nil default: return nil, err } @@ -87,6 +89,8 @@ func browse(ctx context.Context, n *opcua.Node, path string, level int, logger * switch err := attrs[1].Status; err { case ua.StatusOK: def.BrowseName = attrs[1].Value.String() + case ua.StatusBadSecurityModeInsufficient: + return nil, nil default: return nil, err } @@ -96,6 +100,8 @@ func browse(ctx context.Context, n *opcua.Node, path string, level int, logger * def.Description = attrs[2].Value.String() case ua.StatusBadAttributeIDInvalid: // ignore + case ua.StatusBadSecurityModeInsufficient: + return nil, nil default: return nil, err } @@ -106,6 +112,8 @@ func browse(ctx context.Context, n *opcua.Node, path string, level int, logger * def.Writable = def.AccessLevel&ua.AccessLevelTypeCurrentWrite == ua.AccessLevelTypeCurrentWrite case ua.StatusBadAttributeIDInvalid: // ignore + case ua.StatusBadSecurityModeInsufficient: + return nil, nil default: return nil, err } @@ -142,6 +150,8 @@ func browse(ctx context.Context, n *opcua.Node, path string, level int, logger * } case ua.StatusBadAttributeIDInvalid: // ignore + case ua.StatusBadSecurityModeInsufficient: + return nil, nil default: return nil, err } @@ -170,15 +180,21 @@ func browse(ctx context.Context, n *opcua.Node, path string, level int, logger * return nil } - if err := browseChildren(id.HasComponent); err != nil { - return nil, err - } + /* + if err := browseChildren(id.HasComponent); err != nil { + return nil, err + } + */ + // only browse folders so far, don't browse the properties automatically if err := browseChildren(id.Organizes); err != nil { return nil, err } - if err := browseChildren(id.HasProperty); err != nil { - return nil, err - } + // For hasProperty it makes sense to show it very close to the tag itself, e.g., use the tagName as tagGroup and then the properties as subparts of it + /* + if err := browseChildren(id.HasProperty); err != nil { + return nil, err + } + */ return nodes, nil } @@ -438,7 +454,6 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { g.log.Errorf("Failed to connect") return err } - defer c.Close(ctx) // ensure that if something fails here, the connection is always safely closed g.log.Infof("Connected to %s", g.endpoint) g.log.Infof("Please note that browsing large node trees can take a long time (around 5 nodes per second)") @@ -461,6 +476,7 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { nodes, err := browse(ctx, g.client.Node(id), "", 0, g.log) if err != nil { g.log.Errorf("Browsing failed: %s") + c.Close(ctx) // ensure that if something fails here, the connection is always safely closed return err } @@ -471,6 +487,7 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { b, err := json.Marshal(nodeList) if err != nil { g.log.Errorf("Unmarshalling failed: %s") + c.Close(ctx) // ensure that if something fails here, the connection is always safely closed return err } @@ -489,6 +506,7 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { }, g.subNotifyChan) if err != nil { g.log.Errorf("Subscribing failed: %s") + c.Close(ctx) // ensure that if something fails here, the connection is always safely closed return err } @@ -499,16 +517,28 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { monitoredRequests = append(monitoredRequests, miCreateRequest) } + if len(nodeList) == 0 { + g.log.Errorf("Did not subscribe to any nodes. This can happen if the nodes that are selected are incompatible with this benthos version. Aborting...") + return fmt.Errorf("no valid nodes selected") + } + res, err := sub.Monitor(ctx, ua.TimestampsToReturnBoth, monitoredRequests...) if err != nil { g.log.Errorf("Monitoring failed: %s") + c.Close(ctx) // ensure that if something fails here, the connection is always safely closed return err } + if res == nil { + g.log.Errorf("Expected res to not be nil, if there is no error") + c.Close(ctx) // ensure that if something fails here, the connection is always safely closed + return fmt.Errorf("expected res to be not nil") + } // Assuming you want to check the status code of each result for _, result := range res.Results { - if result.StatusCode != ua.StatusOK { + if !errors.Is(result.StatusCode, ua.StatusOK) { g.log.Errorf("Monitoring failed with status code: %v", result.StatusCode) + c.Close(ctx) // ensure that if something fails here, the connection is always safely closed return fmt.Errorf("monitoring failed for node, status code: %v", result.StatusCode) } } @@ -520,11 +550,17 @@ func (g *OPCUAInput) Connect(ctx context.Context) error { return nil } -// TODO: adjust with TypeID in opcua.enums.go -func (g *OPCUAInput) createMessageFromValue(value interface{}, nodeID string) *service.Message { +// createMessageFromValue creates a benthos messages from a given variant and nodeID +// theoretically nodeID can be extracted from variant, but not in all cases (e.g., when subscribing), so it it left to the calling function +func (g *OPCUAInput) createMessageFromValue(variant *ua.Variant, nodeID string) *service.Message { + if variant == nil { + g.log.Errorf("Variant is nil") + return nil + } + b := make([]byte, 0) - switch v := value.(type) { + switch v := variant.Value().(type) { case float32: b = append(b, []byte(strconv.FormatFloat(float64(v), 'f', -1, 32))...) case float64: @@ -553,64 +589,18 @@ func (g *OPCUAInput) createMessageFromValue(value interface{}, nodeID string) *s b = append(b, []byte(strconv.FormatUint(uint64(v), 10))...) case uint64: b = append(b, []byte(strconv.FormatUint(v, 10))...) - case []float32: - for _, val := range v { - b = append(b, []byte(strconv.FormatFloat(float64(val), 'f', -1, 32))...) - } - case []float64: - for _, val := range v { - b = append(b, []byte(strconv.FormatFloat(val, 'f', -1, 64))...) - } - case []string: - for _, val := range v { - b = append(b, []byte(string(val))...) - } - case []bool: - for _, val := range v { - b = append(b, []byte(strconv.FormatBool(val))...) - } - case []int: - for _, val := range v { - b = append(b, []byte(strconv.Itoa(val))...) - } - case []int8: - for _, val := range v { - b = append(b, []byte(strconv.FormatInt(int64(val), 10))...) - } - case []int16: - for _, val := range v { - b = append(b, []byte(strconv.FormatInt(int64(val), 10))...) - } - case []int32: - for _, val := range v { - b = append(b, []byte(strconv.FormatInt(int64(val), 10))...) - } - case []int64: - for _, val := range v { - b = append(b, []byte(strconv.FormatInt(val, 10))...) - } - case []uint: - for _, val := range v { - b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...) - } - case []uint8: - for _, val := range v { - b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...) - } - case []uint16: - for _, val := range v { - b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...) - } - case []uint32: - for _, val := range v { - b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...) - } - case []uint64: - for _, val := range v { - b = append(b, []byte(strconv.FormatUint(val, 10))...) - } default: - g.log.Errorf("Unknown type: %T", v) + // Convert unknown types to JSON + jsonBytes, err := json.Marshal(v) + if err != nil { + g.log.Errorf("Error marshaling to JSON: %v", err) + return nil + } + b = append(b, jsonBytes...) + } + + if b == nil { + g.log.Errorf("Could not create benthos message as payload is empty for node %s: %v", nodeID, b) return nil } @@ -624,6 +614,9 @@ func (g *OPCUAInput) createMessageFromValue(value interface{}, nodeID string) *s } func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { + if g.client == nil { + return nil, nil, errors.New("client is nil") + } // Read all values in NodeList and return each of them as a message with the node's path as the metadata // Create first a list of all the values to read @@ -635,6 +628,10 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s }) } + if len(g.nodeList) > 100 { + g.log.Warnf("Reading more than 100 nodes with pull method. The request might fail as it can take too much time. Recommendation: use subscribeEnabled: true instead for better performance") + } + req := &ua.ReadRequest{ MaxAge: 2000, NodesToRead: nodesToRead, @@ -686,7 +683,7 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s g.log.Errorf("Received nil from node: %s", node.NodeID.String()) continue } - message := g.createMessageFromValue(value.Value(), node.NodeID.String()) + message := g.createMessageFromValue(value, node.NodeID.String()) if message != nil { msgs = append(msgs, message) } @@ -704,6 +701,9 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s func (g *OPCUAInput) ReadBatchSubscribe(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { var res *opcua.PublishNotificationData + if ctx == nil || ctx.Done() == nil { + return nil, nil, errors.New("emptyCtx is invalid for ReadBatchSubscribe") + } select { case res = <-g.subNotifyChan: // Received a result, check for error @@ -712,6 +712,11 @@ func (g *OPCUAInput) ReadBatchSubscribe(ctx context.Context) (service.MessageBat return nil, nil, res.Error } + if g.nodeList == nil { + g.log.Errorf("nodelist is nil") + return nil, nil, errors.New("nodelist empty") + } + // Create a message with the node's path as the metadata msgs := service.MessageBatch{} @@ -728,7 +733,7 @@ func (g *OPCUAInput) ReadBatchSubscribe(ctx context.Context) (service.MessageBat handleID := item.ClientHandle if uint32(len(g.nodeList)) >= handleID { - message := g.createMessageFromValue(item.Value.Value.Value(), g.nodeList[handleID].NodeID.String()) + message := g.createMessageFromValue(item.Value.Value, g.nodeList[handleID].NodeID.String()) if message != nil { msgs = append(msgs, message) } @@ -743,9 +748,13 @@ func (g *OPCUAInput) ReadBatchSubscribe(ctx context.Context) (service.MessageBat return nil }, nil - case <-ctx.Done(): - // Timeout occurred - g.log.Error("Timeout waiting for response from g.subNotifyChan") + case _, ok := <-ctx.Done(): + if !ok { + g.log.Errorf("timeout channel was closed") + } else { + // Timeout occurred + g.log.Error("Timeout waiting for response from g.subNotifyChan") + } return nil, nil, errors.New("timeout waiting for response") } } diff --git a/plugin/opcua_test.go b/plugin/opcua_test.go index 81447c45..bbfa3194 100644 --- a/plugin/opcua_test.go +++ b/plugin/opcua_test.go @@ -19,7 +19,9 @@ import ( "crypto/x509" "encoding/json" "encoding/pem" + "fmt" "os" + "strings" "testing" "time" @@ -29,102 +31,892 @@ import ( ) func TestAgainstSimulator(t *testing.T) { - t.Skip("This test is flaky as it can run only once per `docker-compose up`. Probably need a new OPC-UA simulator.") + endpoint := os.Getenv("TEST_WAGO_ENDPOINT_URI") + username := os.Getenv("TEST_WAGO_USERNAME") + password := os.Getenv("TEST_WAGO_PASSWORD") + + // Check if environment variables are set + if endpoint != "" || username != "" || password != "" { + t.Skip("Skipping test: environment variables are set") + return + } + t.Run("Logging Endpoints", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - var endpoints []*ua.EndpointDescription + var endpoints []*ua.EndpointDescription + var err error + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", // Important: ensure that the DNS name in the certificates of the server is also localhost (Hostname and DNS Name), as otherwise the server will refuse the connection + username: "", + password: "", + nodeIDs: nil, + insecure: false, + } + + endpoints, err = opcua.GetEndpoints(ctx, input.endpoint) + assert.NoError(t, err) + + for i, endpoint := range endpoints { + t.Logf("Endpoint %d:", i+1) + t.Logf(" EndpointURL: %s", endpoint.EndpointURL) + t.Logf(" SecurityMode: %v", endpoint.SecurityMode) + t.Logf(" SecurityPolicyURI: %s", endpoint.SecurityPolicyURI) + t.Logf(" TransportProfileURI: %s", endpoint.TransportProfileURI) + t.Logf(" SecurityLevel: %d", endpoint.SecurityLevel) + + // If Server is not nil, log its details + if endpoint.Server != nil { + t.Logf(" Server ApplicationURI: %s", endpoint.Server.ApplicationURI) + t.Logf(" Server ProductURI: %s", endpoint.Server.ProductURI) + t.Logf(" Server ApplicationName: %s", endpoint.Server.ApplicationName.Text) + t.Logf(" Server ApplicationType: %v", endpoint.Server.ApplicationType) + t.Logf(" Server GatewayServerURI: %s", endpoint.Server.GatewayServerURI) + t.Logf(" Server DiscoveryProfileURI: %s", endpoint.Server.DiscoveryProfileURI) + t.Logf(" Server DiscoveryURLs: %v", endpoint.Server.DiscoveryURLs) + } + + // Output the certificate + if len(endpoint.ServerCertificate) > 0 { + // Convert to PEM format first, then log the certificate information + pemCert := pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: endpoint.ServerCertificate, + }) + logCertificateInfo(t, pemCert) + } + + // Loop through UserIdentityTokens + for j, token := range endpoint.UserIdentityTokens { + t.Logf(" UserIdentityToken %d:", j+1) + t.Logf(" PolicyID: %s", token.PolicyID) + t.Logf(" TokenType: %v", token.TokenType) + t.Logf(" IssuedTokenType: %s", token.IssuedTokenType) + t.Logf(" IssuerEndpointURL: %s", token.IssuerEndpointURL) + } + } + selectedEndpoint := input.getReasonableEndpoint(endpoints, ua.UserTokenTypeFromString("Anonymous"), input.insecure, "SignAndEncrypt", "Basic256Sha256") + t.Logf("selected endpoint %v:", selectedEndpoint) + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("ConnectAnonymousInsecure", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + nodeIDs: nil, + insecure: true, // It only works when not using encryption + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("ConnectAnonymousSecure", func(t *testing.T) { + t.Skip("Secure is currently not working, as the OPC UA simulator server aborts the connection to to certificate problems") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + nodeIDs: nil, + insecure: false, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect Username-Password fail Insecure", func(t *testing.T) { + t.Skip() // Needs to be skipped, the current OPC-UA simulator does only logging in once, after that it fails + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "sysadmin_bad", // bad user and password + password: "demo", + insecure: true, // It only works when not using encryption + nodeIDs: nil, + } + // Attempt to connect + err = input.Connect(ctx) + assert.Error(t, err) + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("Connect Anonymous Insecure", func(t *testing.T) { + t.Skip() // Needs to be skipped, the current OPC-UA simulator does only logging in once, after that it fails + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: nil, + } + // Attempt to connect + err = input.Connect(ctx) + assert.Error(t, err) + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("Connect Username-Password success Insecure", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "sysadmin", + password: "demo", + insecure: true, // It only works when not using encryption + nodeIDs: nil, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("Connect Subscribe", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings = []string{"ns=3;s=Fast"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Logf("%+v", messageBatch) + t.Fatal(err) + } + + assert.GreaterOrEqual(t, len(messageBatch), 6) + + for _, message := range messageBatch { + message, err := message.AsStructuredMut() + if err != nil { + t.Fatal(err) + } + var exampleNumber json.Number = "22.565684" + assert.IsType(t, exampleNumber, message) // it should be a number + t.Log("Received message: ", message) + } + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("Connect Subscribe Boolean With Properties", func(t *testing.T) { + // This test checks that properties are not browsed by default + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings = []string{"ns=6;s=DataAccess_AnalogType_Byte"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 1, len(messageBatch)) + + for _, message := range messageBatch { + message, err := message.AsStructuredMut() + if err != nil { + t.Fatal(err) + } + var exampleNumber json.Number = "0" + assert.Equal(t, exampleNumber, message) // it should be 0 + t.Log("Received message: ", message) + } + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("Connect Subscribe AnalogTypes (simple datatypes)", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings = []string{"ns=6;s=DataAccess_AnalogType_Byte", "ns=6;s=DataAccess_AnalogType_Double", "ns=6;s=DataAccess_AnalogType_Float", "ns=6;s=DataAccess_AnalogType_Int16", "ns=6;s=DataAccess_AnalogType_Int32", "ns=6;s=DataAccess_AnalogType_Int64", "ns=6;s=DataAccess_AnalogType_SByte", "ns=6;s=DataAccess_AnalogType_UInt16", "ns=6;s=DataAccess_AnalogType_UInt32", "ns=6;s=DataAccess_AnalogType_UInt64"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 10, len(messageBatch)) + + for _, message := range messageBatch { + message, err := message.AsStructuredMut() + if err != nil { + t.Fatal(err) + } + var exampleNumber json.Number = "22.565684" + assert.IsType(t, exampleNumber, message) // it should be a number + t.Log("Received message: ", message) + } + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("Connect Subscribe null", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"ns=6;s=DataAccess_DataItem_Null"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 0, len(messageBatch)) // should never subscribe to null datatype + + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect Subscribe AnalogTypeArray", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"ns=6;s=DataAccess_AnalogType_Array"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 26, len(messageBatch)) // Adjust the expected number of items as necessary + + for _, message := range messageBatch { + messageParsed, err := message.AsStructuredMut() + if err != nil { + // This might happen if an empty string is returned from OPC-UA + continue + } + + opcuapath, found := message.MetaGet("opcua_path") + if !found { + t.Fatal("Could not find opcua_path") + } + + // Determine the data type from the OPC UA path + dataType := strings.Split(opcuapath, "_")[5] // This will extract the data type part of the OPC UA path + t.Log(dataType) + + // Check if the data type is an array and handle accordingly + if strings.HasSuffix(dataType, "Array") { + dataTypeOfArray := strings.Split(opcuapath, "_")[6] + t.Log(dataTypeOfArray) + // Handle array data types + switch dataTypeOfArray { + case "Duration", "Guid", "LocaleId", "Boolean", "LocalizedText", "NodeId", "QualifiedName", "UtcTime", "DateTime", "Double", "Enumeration", "Float", "Int16", "Int32", "Int64", "Integer", "Number", "SByte", "StatusCode", "String", "UInt16", "UInt32", "UInt64", "UInteger", "Variant", "XmlElement", "ByteString": + // Check if the messageParsed is of type slice (array) + messageParsedArray, ok := messageParsed.([]interface{}) + if !ok { + t.Errorf("Expected messageParsed to be an array, but got %T: %s : %s", messageParsed, opcuapath, messageParsed) + } else { + for _, item := range messageParsedArray { + + // Add checking based on the OPC UA path the resulting data type + checkDatatypeOfOPCUATag(t, dataTypeOfArray, item, opcuapath) + } + } + case "Byte": + checkDatatypeOfOPCUATag(t, "ByteArray", messageParsed, opcuapath) + default: + t.Errorf("Unsupported array data type in OPC UA path: %s:%s", dataType, opcuapath) + } + } else { + t.Fatalf("Received non-array: %s", opcuapath) + } + } + + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect Subscribe DataItem", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings = []string{"ns=6;s=DataAccess_DataItem"} // it will subscribe to all values with data type that is non-null. + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 23, len(messageBatch)) // these are theoretically >30, but most of them are null, so the browse function ignores them + + for _, message := range messageBatch { + messageParsed, err := message.AsStructuredMut() + if err != nil { + // This might happen if an empty string is returned from OPC-UA + continue + } + + opcuapath, found := message.MetaGet("opcua_path") + if found != true { + t.Fatal("Could not find opcua_path") + } + + // Determine the data type from the OPC UA path + dataType := strings.Split(opcuapath, "_")[5] // This will extract the data type part of the OPC UA path + + // Add checking based on the OPC UA path the resulting data type + checkDatatypeOfOPCUATag(t, dataType, messageParsed, opcuapath) + } + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("TestForFailedNodeCrash", func(t *testing.T) { + // https://github.com/united-manufacturing-hub/MgmtIssues/issues/1088 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings = []string{ + "ns=3;s=Fast", + "ns=3;s=Slow", + } + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", // Important: ensure that the DNS name in the certificates of the server is also localhost (Hostname and DNS Name), as otherwise the server will refuse the connection + username: "", + password: "", + nodeIDs: parsedNodeIDs, + insecure: true, + subscribeEnabled: true, + } + + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.NotEmpty(t, messageBatch) + + for _, message := range messageBatch { + _, err := message.AsStructured() + if err != nil { + t.Fatal(err) + } + } + + // Close connection + if input.client != nil { + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("Connect Subscribe Scalar Arrays", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"ns=6;s=Scalar_Static_Arrays"} // it will subscribe to all values with data type that is non-null. + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 26, len(messageBatch)) + + for _, message := range messageBatch { + messageParsed, err := message.AsStructuredMut() + if err != nil { + // This might happen if an empty string is returned from OPC-UA + continue + } + + opcuapath, found := message.MetaGet("opcua_path") + if found != true { + t.Fatal("Could not find opcua_path") + } + + // Determine the data type from the OPC UA path + dataType := strings.Split(opcuapath, "_")[5] // This will extract the data type part of the OPC UA path + t.Log(dataType) + + // Check if the data type is an array and handle accordingly + if strings.HasSuffix(dataType, "Arrays") { + dataTypeOfArray := strings.Split(opcuapath, "_")[6] + t.Log(dataTypeOfArray) + // Handle array data types + switch dataTypeOfArray { + case "Duration", "Guid", "LocaleId", "Boolean", "LocalizedText", "NodeId", "QualifiedName", "UtcTime", "DateTime", "Double", "Enumeration", "Float", "Int16", "Int32", "Int64", "SByte", "StatusCode", "String", "UInt16", "UInt32", "UInt64", "XmlElement", "ByteString": + // Check if the messageParsed is of type slice (array) + messageParsedArray, ok := messageParsed.([]interface{}) + if !ok { + t.Errorf("Expected messageParsed to be an array, but got %T: %s : %s", messageParsed, opcuapath, messageParsed) + } else { + for _, item := range messageParsedArray { + + // Add checking based on the OPC UA path the resulting data type + checkDatatypeOfOPCUATag(t, dataTypeOfArray, item, opcuapath) + } + } + case "Byte": + checkDatatypeOfOPCUATag(t, "ByteArray", messageParsed, opcuapath) + case "Integer", "Number", "Variant", "UInteger": // Variant Arrays are not supported by go upcua lib + t.Logf("Unsupported array data type in OPC UA path: %s:%s", dataType, opcuapath) + default: + t.Errorf("Unsupported array data type in OPC UA path: %s:%s", dataType, opcuapath) + } + } else { + t.Fatalf("Received non-array: %s", opcuapath) + } + } + + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect Subscribe does not fail when subscribing to entire simulator", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"ns=3;s=OpcPlc"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: false, // set to false because some values will change more often in a second resulting in too many messages + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.GreaterOrEqual(t, len(messageBatch), 125) + + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect does not fail when subscribing to everything", func(t *testing.T) { + t.Skip("This might take too long...") + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"i=84"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 25, len(messageBatch)) + + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect Subscribe does not subscribe to objects", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var err error + var nodeIDStrings []string = []string{"ns=3;s=SimulatorConfiguration"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + input := &OPCUAInput{ - endpoint: "opc.tcp://localhost:46010", - username: "", - password: "", - nodeIDs: nil, + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: true, + } + // Attempt to connect + err = input.Connect(ctx) + if assert.Error(t, err) { + assert.Equal(t, fmt.Errorf("no valid nodes selected"), err) } - endpoints, err = opcua.GetEndpoints(ctx, input.endpoint) + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect Subscribe does not fail when subscribing to Anomaly", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"ns=3;s=Anomaly"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: false, // disabling subscribe because messages change moreo ften than once in a second reuslting in to many messages + } + // Attempt to connect + err = input.Connect(ctx) assert.NoError(t, err) - for i, endpoint := range endpoints { - t.Logf("Endpoint %d:", i+1) - t.Logf(" EndpointURL: %s", endpoint.EndpointURL) - t.Logf(" SecurityMode: %v", endpoint.SecurityMode) - t.Logf(" SecurityPolicyURI: %s", endpoint.SecurityPolicyURI) - t.Logf(" TransportProfileURI: %s", endpoint.TransportProfileURI) - t.Logf(" SecurityLevel: %d", endpoint.SecurityLevel) + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } - // If Server is not nil, log its details - if endpoint.Server != nil { - t.Logf(" Server ApplicationURI: %s", endpoint.Server.ApplicationURI) - t.Logf(" Server ProductURI: %s", endpoint.Server.ProductURI) - t.Logf(" Server ApplicationName: %s", endpoint.Server.ApplicationName.Text) - t.Logf(" Server ApplicationType: %v", endpoint.Server.ApplicationType) - t.Logf(" Server GatewayServerURI: %s", endpoint.Server.GatewayServerURI) - t.Logf(" Server DiscoveryProfileURI: %s", endpoint.Server.DiscoveryProfileURI) - t.Logf(" Server DiscoveryURLs: %v", endpoint.Server.DiscoveryURLs) - } + assert.Equal(t, 4, len(messageBatch)) - // Output the certificate - if len(endpoint.ServerCertificate) > 0 { - // Convert to PEM format first, then log the certificate information - pemCert := pem.EncodeToMemory(&pem.Block{ - Type: "CERTIFICATE", - Bytes: endpoint.ServerCertificate, - }) - logCertificateInfo(t, pemCert) - } + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) - // Loop through UserIdentityTokens - for j, token := range endpoint.UserIdentityTokens { - t.Logf(" UserIdentityToken %d:", j+1) - t.Logf(" PolicyID: %s", token.PolicyID) - t.Logf(" TokenType: %v", token.TokenType) - t.Logf(" IssuedTokenType: %s", token.IssuedTokenType) - t.Logf(" IssuerEndpointURL: %s", token.IssuerEndpointURL) - } + t.Run("Connect Subscribe does not fail when subscribing to Basic", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"ns=3;s=Basic"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: false, // disabling subscribe because messages change moreo ften than once in a second reuslting in to many messages + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 4, len(messageBatch)) + + // Close connection + if input.client != nil { + input.client.Close(ctx) } }) - t.Run("ConnectAnonymous", func(t *testing.T) { + t.Run("Connect Subscribe does not fail when subscribing to Deterministic GUID", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var err error + var nodeIDStrings []string = []string{"ns=3;s=Deterministic GUID"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + input := &OPCUAInput{ - endpoint: "opc.tcp://localhost:46010", - username: "", - password: "", - nodeIDs: nil, + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: false, } // Attempt to connect err = input.Connect(ctx) assert.NoError(t, err) + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 5, len(messageBatch)) + // Close connection if input.client != nil { input.client.Close(ctx) } }) - t.Run("Connect Username-Password fail", func(t *testing.T) { - t.Skip() // Needs to be skipped, the current OPC-UA simulator does only logging in once, after that it fails + t.Run("Connect Subscribe does not fail when subscribing to Fast", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var err error + var nodeIDStrings []string = []string{"ns=3;s=Fast"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + input := &OPCUAInput{ - endpoint: "opc.tcp://localhost:46010", - username: "123", // bad user and password - password: "123", - nodeIDs: nil, + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: false, } // Attempt to connect err = input.Connect(ctx) - assert.Error(t, err) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 6, len(messageBatch)) // Close connection if input.client != nil { @@ -132,22 +924,70 @@ func TestAgainstSimulator(t *testing.T) { } }) - t.Run("Connect Username-Password success", func(t *testing.T) { + t.Run("Connect Subscribe does not fail when subscribing to Slow", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var err error + var nodeIDStrings []string = []string{"ns=3;s=Slow"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + input := &OPCUAInput{ - endpoint: "opc.tcp://localhost:46010", - username: "root", - password: "secret", - nodeIDs: nil, + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: false, + } + // Attempt to connect + err = input.Connect(ctx) + assert.NoError(t, err) + + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.GreaterOrEqual(t, len(messageBatch), 100) + + // Close connection + if input.client != nil { + input.client.Close(ctx) + } + }) + + t.Run("Connect Subscribe does not fail when subscribing to Special", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + + var nodeIDStrings []string = []string{"ns=3;s=Special"} + + parsedNodeIDs := ParseNodeIDs(nodeIDStrings) + + input := &OPCUAInput{ + endpoint: "opc.tcp://localhost:50000", + username: "", + password: "", + insecure: true, // It only works when not using encryption + nodeIDs: parsedNodeIDs, + subscribeEnabled: false, } // Attempt to connect err = input.Connect(ctx) assert.NoError(t, err) + messageBatch, _, err := input.ReadBatch(ctx) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 7, len(messageBatch)) + // Close connection if input.client != nil { input.client.Close(ctx) @@ -156,6 +996,195 @@ func TestAgainstSimulator(t *testing.T) { } +func checkDatatypeOfOPCUATag(t *testing.T, dataType string, messageParsed any, opcuapath string) { + t.Logf("%s, %+v, %s", dataType, messageParsed, opcuapath) + switch dataType { + case "Boolean": + var expectedType bool + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Boolean message: ", messageParsed) + + case "Byte": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Byte message: ", messageParsed) + + case "DateTime": //warning: there is a bug when the date is a lot in the future year 30828 + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received DateTime message: ", messageParsed) + + case "Double": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Double message: ", messageParsed) + + case "Enumeration": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Enumeration message: ", messageParsed) + case "ExpandedNodeId": + // Assert that messageParsed is of type map[string]interface{} + parsedMap, ok := messageParsed.(map[string]interface{}) + if !ok { + t.Errorf("Expected messageParsed to be of type map[string]interface{}, but got %T", messageParsed) + } else { + // Check if the keys inside the map are correct + expectedKeys := []string{"NamespaceURI", "NodeID", "ServerIndex"} + for _, key := range expectedKeys { + if _, exists := parsedMap[key]; !exists { + t.Errorf("Expected key %s missing in messageParsed", key) + } + } + + // Optionally, log the received message + t.Log("Received ExpandedNodeId message: ", messageParsed) + } + case "Float": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Float message: ", messageParsed) + + case "Guid": + // Assert that messageParsed is of type map[string]interface{} + parsedMap, ok := messageParsed.(map[string]interface{}) + if !ok { + t.Errorf("Expected messageParsed to be of type map[string]interface{}, but got %T", messageParsed) + } else { + // Check if the keys inside the map are correct + expectedKeys := []string{"Data1", "Data2", "Data3", "Data4"} + for _, key := range expectedKeys { + if _, exists := parsedMap[key]; !exists { + t.Errorf("Expected key %s missing in messageParsed", key) + } + } + + // Optionally, log the received message + t.Log("Received GUID message: ", messageParsed) + } + case "Int16": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Int16 message: ", messageParsed) + + case "Int32": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Int32 message: ", messageParsed) + + case "Int64": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Int64 message: ", messageParsed) + + case "Integer": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Integer message: ", messageParsed) + + case "LocalizedText": + // Assert that messageParsed is of type map[string]interface{} + parsedMap, ok := messageParsed.(map[string]interface{}) + if !ok { + t.Errorf("Expected messageParsed to be of type map[string]interface{}, but got %T", messageParsed) + } else { + // Check if the keys inside the map are correct + expectedKeys := []string{"EncodingMask", "Locale", "Text"} + for _, key := range expectedKeys { + if _, exists := parsedMap[key]; !exists { + t.Errorf("Expected key %s missing in messageParsed", key) + } + } + + // Optionally, log the received message + t.Log("Received LocalizedText message: ", messageParsed) + } + case "NodeId": + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received NodeId message: ", messageParsed) + case "Number": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Number message: ", messageParsed) + case "QualifiedName": + // Assert that messageParsed is of type map[string]interface{} + parsedMap, ok := messageParsed.(map[string]interface{}) + if !ok { + t.Errorf("Expected messageParsed to be of type map[string]interface{}, but got %T", messageParsed) + } else { + // Define the keys expected in a QualifiedName message + expectedKeys := []string{"NamespaceIndex", "Name"} + for _, key := range expectedKeys { + if _, exists := parsedMap[key]; !exists { + t.Errorf("Expected key %s missing in messageParsed", key) + } + } + + // Optionally, log the received message + t.Log("Received QualifiedName message: ", messageParsed) + } + case "SByte": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received SByte message: ", messageParsed) + case "StatusCode": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received StatusCode message: ", messageParsed) + case "String": + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received String message: ", messageParsed) + case "UInt16": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received UInt16 message: ", messageParsed) + case "UInt32": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received UInt32 message: ", messageParsed) + case "UInt64": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received UInt64 message: ", messageParsed) + case "UInteger": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received UInteger message: ", messageParsed) + case "ByteArray": // case as an array of bytes is not a number, but a string + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received ByteArray message: ", messageParsed) + case "ByteString": + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received ByteString message: ", messageParsed) + case "Duration": + var expectedType json.Number + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Duration message: ", messageParsed) + case "LocaleId": + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received LocaleId message: ", messageParsed) + case "UtcTime": + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received UtcTime message: ", messageParsed) + case "Variant": + var expectedType map[string]interface{} + assert.IsType(t, expectedType, messageParsed) + t.Log("Received Variant message: ", messageParsed) + case "XmlElement": + var expectedType string + assert.IsType(t, expectedType, messageParsed) + t.Log("Received XmlElement message: ", messageParsed) + default: + t.Errorf("Unsupported data type in OPC UA path: %s:%s", dataType, opcuapath) + } +} + func TestAgainstRemoteInstance(t *testing.T) { // These information can be found in Bitwarden under WAGO PLC @@ -187,7 +1216,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -211,7 +1243,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -233,7 +1268,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -255,7 +1293,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -265,7 +1306,7 @@ func TestAgainstRemoteInstance(t *testing.T) { var err error - var nodeIDStrings []string = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} + var nodeIDStrings = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} parsedNodeIDs := ParseNodeIDs(nodeIDStrings) @@ -281,7 +1322,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -291,7 +1335,7 @@ func TestAgainstRemoteInstance(t *testing.T) { var err error - var nodeIDStrings []string = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} + var nodeIDStrings = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} parsedNodeIDs := ParseNodeIDs(nodeIDStrings) @@ -324,7 +1368,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -334,7 +1381,7 @@ func TestAgainstRemoteInstance(t *testing.T) { var err error - var nodeIDStrings []string = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL", "ns=4;s=|vprop|WAGO 750-8101 PFC100 CS 2ETH.Application.RevisionCounter"} + var nodeIDStrings = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL", "ns=4;s=|vprop|WAGO 750-8101 PFC100 CS 2ETH.Application.RevisionCounter"} parsedNodeIDs := ParseNodeIDs(nodeIDStrings) @@ -390,7 +1437,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -400,7 +1450,7 @@ func TestAgainstRemoteInstance(t *testing.T) { var err error - var nodeIDStrings []string = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} + var nodeIDStrings = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} parsedNodeIDs := ParseNodeIDs(nodeIDStrings) @@ -434,7 +1484,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -444,7 +1497,7 @@ func TestAgainstRemoteInstance(t *testing.T) { var err error - var nodeIDStrings []string = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} + var nodeIDStrings = []string{"ns=4;s=|var|WAGO 750-8101 PFC100 CS 2ETH.Application.GVL"} parsedNodeIDs := ParseNodeIDs(nodeIDStrings) @@ -480,7 +1533,10 @@ func TestAgainstRemoteInstance(t *testing.T) { // Close connection if input.client != nil { - input.client.Close(ctx) + err = input.client.Close(ctx) + if err != nil { + t.Fatal(err) + } } }) @@ -590,7 +1646,7 @@ func TestGetReasonableEndpoint_Insecure(t *testing.T) { t.Errorf("Expected selected endpoint to have no encryption, but got %v", selectedEndpoint.SecurityMode) } } else { - t.Error("Expected a reasonable endpoint, but got nil") + t.Fatalf("Expected a reasonable endpoint, but got nil") // This needs to be fatal, to prevent nil error in selectedEndpoint2 check } input2 := &OPCUAInput{ diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 54451436..9f5b9752 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -1,11 +1,23 @@ -version: '3.9' +version: "3.8" services: - ioTSensorsOPCUA: - image: ghcr.io/united-manufacturing-hub/opcuasimulator:latest - restart: always - container_name: iot-sensors - hostname: ioTSensorsOPCUA - volumes: - - ./config.json:/configs/config.json + opcplc: + image: mcr.microsoft.com/iotedge/opc-plc:2.9.11 + hostname: localhost + command: + - --pn=50000 + - --autoaccept + - --sph + - --sn=100 + - --sr=10 + - --st=uint + - --fn=5 + - --fr=1 + - --ft=uint + - --gn=5 + - --certdnsnames=localhost + - --plchostname=localhost + - --unsecuretransport ports: - - 46010:46010 + - "50000:50000" + - "8080:8080" + restart: unless-stopped